You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vv...@apache.org on 2015/09/07 17:46:08 UTC

[06/50] [abbrv] hadoop git commit: HDFS-8990. Move RemoteBlockReader to hdfs-client module. Contributed by Mingliang Liu.

HDFS-8990. Move RemoteBlockReader to hdfs-client module. Contributed by Mingliang Liu.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/826ae1c2
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/826ae1c2
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/826ae1c2

Branch: refs/heads/YARN-3926
Commit: 826ae1c26d31f87d88efc920b271bec7eec2e17a
Parents: caa04de
Author: Haohui Mai <wh...@apache.org>
Authored: Mon Aug 31 13:54:14 2015 -0700
Committer: Haohui Mai <wh...@apache.org>
Committed: Mon Aug 31 13:54:14 2015 -0700

----------------------------------------------------------------------
 .../apache/hadoop/hdfs/RemoteBlockReader.java   | 512 +++++++++++++++++++
 .../apache/hadoop/hdfs/RemoteBlockReader2.java  | 480 +++++++++++++++++
 .../protocol/datatransfer/PacketHeader.java     | 214 ++++++++
 .../protocol/datatransfer/PacketReceiver.java   | 310 +++++++++++
 .../hdfs/util/ByteBufferOutputStream.java       |  49 ++
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 .../java/org/apache/hadoop/hdfs/DFSClient.java  |   1 -
 .../apache/hadoop/hdfs/RemoteBlockReader.java   | 508 ------------------
 .../apache/hadoop/hdfs/RemoteBlockReader2.java  | 477 -----------------
 .../protocol/datatransfer/PacketHeader.java     | 214 --------
 .../protocol/datatransfer/PacketReceiver.java   | 310 -----------
 .../hdfs/util/ByteBufferOutputStream.java       |  49 --
 .../hdfs/TestClientBlockVerification.java       |   4 +-
 13 files changed, 1570 insertions(+), 1561 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/826ae1c2/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
new file mode 100644
index 0000000..7509da5
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
@@ -0,0 +1,512 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.EnumSet;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.FSInputChecker;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.ReadOption;
+import org.apache.hadoop.hdfs.net.Peer;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
+import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
+import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
+import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.DataChecksum;
+import org.apache.htrace.Sampler;
+import org.apache.htrace.Trace;
+import org.apache.htrace.TraceScope;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * @deprecated this is an old implementation that is being left around
+ * in case any issues spring up with the new {@link RemoteBlockReader2} implementation.
+ * It will be removed in the next release.
+ */
+@InterfaceAudience.Private
+@Deprecated
+public class RemoteBlockReader extends FSInputChecker implements BlockReader {
+  static final Logger LOG = LoggerFactory.getLogger(FSInputChecker.class);
+
+  private final Peer peer;
+  private final DatanodeID datanodeID;
+  private final DataInputStream in;
+  private DataChecksum checksum;
+
+  /** offset in block of the last chunk received */
+  private long lastChunkOffset = -1;
+  private long lastChunkLen = -1;
+  private long lastSeqNo = -1;
+
+  /** offset in block where reader wants to actually read */
+  private long startOffset;
+
+  private final long blockId;
+
+  /** offset in block of of first chunk - may be less than startOffset
+      if startOffset is not chunk-aligned */
+  private final long firstChunkOffset;
+
+  private final int bytesPerChecksum;
+  private final int checksumSize;
+
+  /**
+   * The total number of bytes we need to transfer from the DN.
+   * This is the amount that the user has requested plus some padding
+   * at the beginning so that the read can begin on a chunk boundary.
+   */
+  private final long bytesNeededToFinish;
+  
+  /**
+   * True if we are reading from a local DataNode.
+   */
+  private final boolean isLocal;
+
+  private boolean eos = false;
+  private boolean sentStatusCode = false;
+  
+  ByteBuffer checksumBytes = null;
+  /** Amount of unread data in the current received packet */
+  int dataLeft = 0;
+  
+  private final PeerCache peerCache;
+  
+  /* FSInputChecker interface */
+  
+  /* same interface as inputStream java.io.InputStream#read()
+   * used by DFSInputStream#read()
+   * This violates one rule when there is a checksum error:
+   * "Read should not modify user buffer before successful read"
+   * because it first reads the data to user buffer and then checks
+   * the checksum.
+   */
+  @Override
+  public synchronized int read(byte[] buf, int off, int len) 
+                               throws IOException {
+    
+    // This has to be set here, *before* the skip, since we can
+    // hit EOS during the skip, in the case that our entire read
+    // is smaller than the checksum chunk.
+    boolean eosBefore = eos;
+
+    //for the first read, skip the extra bytes at the front.
+    if (lastChunkLen < 0 && startOffset > firstChunkOffset && len > 0) {
+      // Skip these bytes. But don't call this.skip()!
+      int toSkip = (int)(startOffset - firstChunkOffset);
+      if ( super.readAndDiscard(toSkip) != toSkip ) {
+        // should never happen
+        throw new IOException("Could not skip required number of bytes");
+      }
+    }
+    
+    int nRead = super.read(buf, off, len);
+
+    // if eos was set in the previous read, send a status code to the DN
+    if (eos && !eosBefore && nRead >= 0) {
+      if (needChecksum()) {
+        sendReadResult(peer, Status.CHECKSUM_OK);
+      } else {
+        sendReadResult(peer, Status.SUCCESS);
+      }
+    }
+    return nRead;
+  }
+
+  @Override
+  public synchronized long skip(long n) throws IOException {
+    /* How can we make sure we don't throw a ChecksumException, at least
+     * in majority of the cases?. This one throws. */  
+    long nSkipped = 0;
+    while (nSkipped < n) {
+      int toSkip = (int)Math.min(n-nSkipped, Integer.MAX_VALUE);
+      int ret = readAndDiscard(toSkip);
+      if (ret <= 0) {
+        return nSkipped;
+      }
+      nSkipped += ret;
+    }
+    return nSkipped;
+  }
+
+  @Override
+  public int read() throws IOException {
+    throw new IOException("read() is not expected to be invoked. " +
+                          "Use read(buf, off, len) instead.");
+  }
+  
+  @Override
+  public boolean seekToNewSource(long targetPos) throws IOException {
+    /* Checksum errors are handled outside the BlockReader. 
+     * DFSInputStream does not always call 'seekToNewSource'. In the 
+     * case of pread(), it just tries a different replica without seeking.
+     */ 
+    return false;
+  }
+  
+  @Override
+  public void seek(long pos) throws IOException {
+    throw new IOException("Seek() is not supported in BlockInputChecker");
+  }
+
+  @Override
+  protected long getChunkPosition(long pos) {
+    throw new RuntimeException("getChunkPosition() is not supported, " +
+                               "since seek is not required");
+  }
+  
+  /**
+   * Makes sure that checksumBytes has enough capacity 
+   * and limit is set to the number of checksum bytes needed 
+   * to be read.
+   */
+  private void adjustChecksumBytes(int dataLen) {
+    int requiredSize = 
+      ((dataLen + bytesPerChecksum - 1)/bytesPerChecksum)*checksumSize;
+    if (checksumBytes == null || requiredSize > checksumBytes.capacity()) {
+      checksumBytes =  ByteBuffer.wrap(new byte[requiredSize]);
+    } else {
+      checksumBytes.clear();
+    }
+    checksumBytes.limit(requiredSize);
+  }
+  
+  @Override
+  protected synchronized int readChunk(long pos, byte[] buf, int offset, 
+                                       int len, byte[] checksumBuf) 
+                                       throws IOException {
+    TraceScope scope =
+        Trace.startSpan("RemoteBlockReader#readChunk(" + blockId + ")",
+            Sampler.NEVER);
+    try {
+      return readChunkImpl(pos, buf, offset, len, checksumBuf);
+    } finally {
+      scope.close();
+    }
+  }
+
+  private synchronized int readChunkImpl(long pos, byte[] buf, int offset,
+                                     int len, byte[] checksumBuf)
+                                     throws IOException {
+    // Read one chunk.
+    if (eos) {
+      // Already hit EOF
+      return -1;
+    }
+    
+    // Read one DATA_CHUNK.
+    long chunkOffset = lastChunkOffset;
+    if ( lastChunkLen > 0 ) {
+      chunkOffset += lastChunkLen;
+    }
+    
+    // pos is relative to the start of the first chunk of the read.
+    // chunkOffset is relative to the start of the block.
+    // This makes sure that the read passed from FSInputChecker is the
+    // for the same chunk we expect to be reading from the DN.
+    if ( (pos + firstChunkOffset) != chunkOffset ) {
+      throw new IOException("Mismatch in pos : " + pos + " + " + 
+                            firstChunkOffset + " != " + chunkOffset);
+    }
+
+    // Read next packet if the previous packet has been read completely.
+    if (dataLeft <= 0) {
+      //Read packet headers.
+      PacketHeader header = new PacketHeader();
+      header.readFields(in);
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("DFSClient readChunk got header " + header);
+      }
+
+      // Sanity check the lengths
+      if (!header.sanityCheck(lastSeqNo)) {
+           throw new IOException("BlockReader: error in packet header " +
+                                 header);
+      }
+
+      lastSeqNo = header.getSeqno();
+      dataLeft = header.getDataLen();
+      adjustChecksumBytes(header.getDataLen());
+      if (header.getDataLen() > 0) {
+        IOUtils.readFully(in, checksumBytes.array(), 0,
+                          checksumBytes.limit());
+      }
+    }
+
+    // Sanity checks
+    assert len >= bytesPerChecksum;
+    assert checksum != null;
+    assert checksumSize == 0 || (checksumBuf.length % checksumSize == 0);
+
+
+    int checksumsToRead, bytesToRead;
+
+    if (checksumSize > 0) {
+
+      // How many chunks left in our packet - this is a ceiling
+      // since we may have a partial chunk at the end of the file
+      int chunksLeft = (dataLeft - 1) / bytesPerChecksum + 1;
+
+      // How many chunks we can fit in databuffer
+      //  - note this is a floor since we always read full chunks
+      int chunksCanFit = Math.min(len / bytesPerChecksum,
+                                  checksumBuf.length / checksumSize);
+
+      // How many chunks should we read
+      checksumsToRead = Math.min(chunksLeft, chunksCanFit);
+      // How many bytes should we actually read
+      bytesToRead = Math.min(
+        checksumsToRead * bytesPerChecksum, // full chunks
+        dataLeft); // in case we have a partial
+    } else {
+      // no checksum
+      bytesToRead = Math.min(dataLeft, len);
+      checksumsToRead = 0;
+    }
+
+    if ( bytesToRead > 0 ) {
+      // Assert we have enough space
+      assert bytesToRead <= len;
+      assert checksumBytes.remaining() >= checksumSize * checksumsToRead;
+      assert checksumBuf.length >= checksumSize * checksumsToRead;
+      IOUtils.readFully(in, buf, offset, bytesToRead);
+      checksumBytes.get(checksumBuf, 0, checksumSize * checksumsToRead);
+    }
+
+    dataLeft -= bytesToRead;
+    assert dataLeft >= 0;
+
+    lastChunkOffset = chunkOffset;
+    lastChunkLen = bytesToRead;
+
+    // If there's no data left in the current packet after satisfying
+    // this read, and we have satisfied the client read, we expect
+    // an empty packet header from the DN to signify this.
+    // Note that pos + bytesToRead may in fact be greater since the
+    // DN finishes off the entire last chunk.
+    if (dataLeft == 0 &&
+        pos + bytesToRead >= bytesNeededToFinish) {
+
+      // Read header
+      PacketHeader hdr = new PacketHeader();
+      hdr.readFields(in);
+
+      if (!hdr.isLastPacketInBlock() ||
+          hdr.getDataLen() != 0) {
+        throw new IOException("Expected empty end-of-read packet! Header: " +
+                              hdr);
+      }
+
+      eos = true;
+    }
+
+    if ( bytesToRead == 0 ) {
+      return -1;
+    }
+
+    return bytesToRead;
+  }
+  
+  private RemoteBlockReader(String file, String bpid, long blockId,
+      DataInputStream in, DataChecksum checksum, boolean verifyChecksum,
+      long startOffset, long firstChunkOffset, long bytesToRead, Peer peer,
+      DatanodeID datanodeID, PeerCache peerCache) {
+    // Path is used only for printing block and file information in debug
+    super(new Path("/" + Block.BLOCK_FILE_PREFIX + blockId +
+                    ":" + bpid + ":of:"+ file)/*too non path-like?*/,
+          1, verifyChecksum,
+          checksum.getChecksumSize() > 0? checksum : null, 
+          checksum.getBytesPerChecksum(),
+          checksum.getChecksumSize());
+
+    this.isLocal = DFSUtilClient.isLocalAddress(NetUtils.
+        createSocketAddr(datanodeID.getXferAddr()));
+    
+    this.peer = peer;
+    this.datanodeID = datanodeID;
+    this.in = in;
+    this.checksum = checksum;
+    this.startOffset = Math.max( startOffset, 0 );
+    this.blockId = blockId;
+
+    // The total number of bytes that we need to transfer from the DN is
+    // the amount that the user wants (bytesToRead), plus the padding at
+    // the beginning in order to chunk-align. Note that the DN may elect
+    // to send more than this amount if the read starts/ends mid-chunk.
+    this.bytesNeededToFinish = bytesToRead + (startOffset - firstChunkOffset);
+
+    this.firstChunkOffset = firstChunkOffset;
+    lastChunkOffset = firstChunkOffset;
+    lastChunkLen = -1;
+
+    bytesPerChecksum = this.checksum.getBytesPerChecksum();
+    checksumSize = this.checksum.getChecksumSize();
+    this.peerCache = peerCache;
+  }
+
+  /**
+   * Create a new BlockReader specifically to satisfy a read.
+   * This method also sends the OP_READ_BLOCK request.
+   *
+   * @param file  File location
+   * @param block  The block object
+   * @param blockToken  The block token for security
+   * @param startOffset  The read offset, relative to block head
+   * @param len  The number of bytes to read
+   * @param bufferSize  The IO buffer size (not the client buffer size)
+   * @param verifyChecksum  Whether to verify checksum
+   * @param clientName  Client name
+   * @return New BlockReader instance, or null on error.
+   */
+  public static RemoteBlockReader newBlockReader(String file,
+                                     ExtendedBlock block, 
+                                     Token<BlockTokenIdentifier> blockToken,
+                                     long startOffset, long len,
+                                     int bufferSize, boolean verifyChecksum,
+                                     String clientName, Peer peer,
+                                     DatanodeID datanodeID,
+                                     PeerCache peerCache,
+                                     CachingStrategy cachingStrategy)
+                                       throws IOException {
+    // in and out will be closed when sock is closed (by the caller)
+    final DataOutputStream out =
+        new DataOutputStream(new BufferedOutputStream(peer.getOutputStream()));
+    new Sender(out).readBlock(block, blockToken, clientName, startOffset, len,
+        verifyChecksum, cachingStrategy);
+    
+    //
+    // Get bytes in block, set streams
+    //
+
+    DataInputStream in = new DataInputStream(
+        new BufferedInputStream(peer.getInputStream(), bufferSize));
+    
+    BlockOpResponseProto status = BlockOpResponseProto.parseFrom(
+        PBHelperClient.vintPrefixed(in));
+    RemoteBlockReader2.checkSuccess(status, peer, block, file);
+    ReadOpChecksumInfoProto checksumInfo =
+      status.getReadOpChecksumInfo();
+    DataChecksum checksum = DataTransferProtoUtil.fromProto(
+        checksumInfo.getChecksum());
+    //Warning when we get CHECKSUM_NULL?
+    
+    // Read the first chunk offset.
+    long firstChunkOffset = checksumInfo.getChunkOffset();
+    
+    if ( firstChunkOffset < 0 || firstChunkOffset > startOffset ||
+        firstChunkOffset <= (startOffset - checksum.getBytesPerChecksum())) {
+      throw new IOException("BlockReader: error in first chunk offset (" +
+                            firstChunkOffset + ") startOffset is " + 
+                            startOffset + " for file " + file);
+    }
+
+    return new RemoteBlockReader(file, block.getBlockPoolId(), block.getBlockId(),
+        in, checksum, verifyChecksum, startOffset, firstChunkOffset, len,
+        peer, datanodeID, peerCache);
+  }
+
+  @Override
+  public synchronized void close() throws IOException {
+    startOffset = -1;
+    checksum = null;
+    if (peerCache != null & sentStatusCode) {
+      peerCache.put(datanodeID, peer);
+    } else {
+      peer.close();
+    }
+
+    // in will be closed when its Socket is closed.
+  }
+  
+  @Override
+  public void readFully(byte[] buf, int readOffset, int amtToRead)
+      throws IOException {
+    IOUtils.readFully(this, buf, readOffset, amtToRead);
+  }
+
+  @Override
+  public int readAll(byte[] buf, int offset, int len) throws IOException {
+    return readFully(this, buf, offset, len);
+  }
+
+  /**
+   * When the reader reaches end of the read, it sends a status response
+   * (e.g. CHECKSUM_OK) to the DN. Failure to do so could lead to the DN
+   * closing our connection (which we will re-open), but won't affect
+   * data correctness.
+   */
+  void sendReadResult(Peer peer, Status statusCode) {
+    assert !sentStatusCode : "already sent status code to " + peer;
+    try {
+      RemoteBlockReader2.writeReadResult(peer.getOutputStream(), statusCode);
+      sentStatusCode = true;
+    } catch (IOException e) {
+      // It's ok not to be able to send this. But something is probably wrong.
+      LOG.info("Could not send read status (" + statusCode + ") to datanode " +
+               peer.getRemoteAddressString() + ": " + e.getMessage());
+    }
+  }
+
+  @Override
+  public int read(ByteBuffer buf) throws IOException {
+    throw new UnsupportedOperationException("readDirect unsupported in RemoteBlockReader");
+  }
+  
+  @Override
+  public int available() throws IOException {
+    // An optimistic estimate of how much data is available
+    // to us without doing network I/O.
+    return RemoteBlockReader2.TCP_WINDOW_SIZE;
+  }
+
+  @Override
+  public boolean isLocal() {
+    return isLocal;
+  }
+  
+  @Override
+  public boolean isShortCircuit() {
+    return false;
+  }
+
+  @Override
+  public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/826ae1c2/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
new file mode 100644
index 0000000..5541e6d
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
@@ -0,0 +1,480 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
+import java.util.EnumSet;
+import java.util.UUID;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.ReadOption;
+import org.apache.hadoop.hdfs.net.Peer;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
+import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
+import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
+import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
+import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.DataChecksum;
+import org.apache.htrace.Sampler;
+import org.apache.htrace.Trace;
+import org.apache.htrace.TraceScope;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is a wrapper around connection to datanode
+ * and understands checksum, offset etc.
+ *
+ * Terminology:
+ * <dl>
+ * <dt>block</dt>
+ *   <dd>The hdfs block, typically large (~64MB).
+ *   </dd>
+ * <dt>chunk</dt>
+ *   <dd>A block is divided into chunks, each comes with a checksum.
+ *       We want transfers to be chunk-aligned, to be able to
+ *       verify checksums.
+ *   </dd>
+ * <dt>packet</dt>
+ *   <dd>A grouping of chunks used for transport. It contains a
+ *       header, followed by checksum data, followed by real data.
+ *   </dd>
+ * </dl>
+ * Please see DataNode for the RPC specification.
+ *
+ * This is a new implementation introduced in Hadoop 0.23 which
+ * is more efficient and simpler than the older BlockReader
+ * implementation. It should be renamed to RemoteBlockReader
+ * once we are confident in it.
+ */
+@InterfaceAudience.Private
+public class RemoteBlockReader2  implements BlockReader {
+
+  static final Logger LOG = LoggerFactory.getLogger(RemoteBlockReader2.class);
+  static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB;
+
+  final private Peer peer;
+  final private DatanodeID datanodeID;
+  final private PeerCache peerCache;
+  final private long blockId;
+  private final ReadableByteChannel in;
+
+  private DataChecksum checksum;
+  private final PacketReceiver packetReceiver = new PacketReceiver(true);
+
+  private ByteBuffer curDataSlice = null;
+
+  /** offset in block of the last chunk received */
+  private long lastSeqNo = -1;
+
+  /** offset in block where reader wants to actually read */
+  private long startOffset;
+  private final String filename;
+
+  private final int bytesPerChecksum;
+  private final int checksumSize;
+
+  /**
+   * The total number of bytes we need to transfer from the DN.
+   * This is the amount that the user has requested plus some padding
+   * at the beginning so that the read can begin on a chunk boundary.
+   */
+  private long bytesNeededToFinish;
+
+  /**
+   * True if we are reading from a local DataNode.
+   */
+  private final boolean isLocal;
+
+  private final boolean verifyChecksum;
+
+  private boolean sentStatusCode = false;
+
+  @VisibleForTesting
+  public Peer getPeer() {
+    return peer;
+  }
+  
+  @Override
+  public synchronized int read(byte[] buf, int off, int len) 
+                               throws IOException {
+
+    UUID randomId = null;
+    if (LOG.isTraceEnabled()) {
+      randomId = UUID.randomUUID();
+      LOG.trace(String.format("Starting read #%s file %s from datanode %s",
+        randomId.toString(), this.filename,
+        this.datanodeID.getHostName()));
+    }
+
+    if (curDataSlice == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
+      TraceScope scope = Trace.startSpan(
+          "RemoteBlockReader2#readNextPacket(" + blockId + ")", Sampler.NEVER);
+      try {
+        readNextPacket();
+      } finally {
+        scope.close();
+      }
+    }
+
+    if (LOG.isTraceEnabled()) {
+      LOG.trace(String.format("Finishing read #" + randomId));
+    }
+
+    if (curDataSlice.remaining() == 0) {
+      // we're at EOF now
+      return -1;
+    }
+    
+    int nRead = Math.min(curDataSlice.remaining(), len);
+    curDataSlice.get(buf, off, nRead);
+    
+    return nRead;
+  }
+
+
+  @Override
+  public synchronized int read(ByteBuffer buf) throws IOException {
+    if (curDataSlice == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
+      TraceScope scope = Trace.startSpan(
+          "RemoteBlockReader2#readNextPacket(" + blockId + ")", Sampler.NEVER);
+      try {
+        readNextPacket();
+      } finally {
+        scope.close();
+      }
+    }
+    if (curDataSlice.remaining() == 0) {
+      // we're at EOF now
+      return -1;
+    }
+
+    int nRead = Math.min(curDataSlice.remaining(), buf.remaining());
+    ByteBuffer writeSlice = curDataSlice.duplicate();
+    writeSlice.limit(writeSlice.position() + nRead);
+    buf.put(writeSlice);
+    curDataSlice.position(writeSlice.position());
+
+    return nRead;
+  }
+
+  private void readNextPacket() throws IOException {
+    //Read packet headers.
+    packetReceiver.receiveNextPacket(in);
+
+    PacketHeader curHeader = packetReceiver.getHeader();
+    curDataSlice = packetReceiver.getDataSlice();
+    assert curDataSlice.capacity() == curHeader.getDataLen();
+    
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("DFSClient readNextPacket got header " + curHeader);
+    }
+
+    // Sanity check the lengths
+    if (!curHeader.sanityCheck(lastSeqNo)) {
+         throw new IOException("BlockReader: error in packet header " +
+                               curHeader);
+    }
+    
+    if (curHeader.getDataLen() > 0) {
+      int chunks = 1 + (curHeader.getDataLen() - 1) / bytesPerChecksum;
+      int checksumsLen = chunks * checksumSize;
+
+      assert packetReceiver.getChecksumSlice().capacity() == checksumsLen :
+        "checksum slice capacity=" + packetReceiver.getChecksumSlice().capacity() + 
+          " checksumsLen=" + checksumsLen;
+      
+      lastSeqNo = curHeader.getSeqno();
+      if (verifyChecksum && curDataSlice.remaining() > 0) {
+        // N.B.: the checksum error offset reported here is actually
+        // relative to the start of the block, not the start of the file.
+        // This is slightly misleading, but preserves the behavior from
+        // the older BlockReader.
+        checksum.verifyChunkedSums(curDataSlice,
+            packetReceiver.getChecksumSlice(),
+            filename, curHeader.getOffsetInBlock());
+      }
+      bytesNeededToFinish -= curHeader.getDataLen();
+    }    
+    
+    // First packet will include some data prior to the first byte
+    // the user requested. Skip it.
+    if (curHeader.getOffsetInBlock() < startOffset) {
+      int newPos = (int) (startOffset - curHeader.getOffsetInBlock());
+      curDataSlice.position(newPos);
+    }
+
+    // If we've now satisfied the whole client read, read one last packet
+    // header, which should be empty
+    if (bytesNeededToFinish <= 0) {
+      readTrailingEmptyPacket();
+      if (verifyChecksum) {
+        sendReadResult(Status.CHECKSUM_OK);
+      } else {
+        sendReadResult(Status.SUCCESS);
+      }
+    }
+  }
+  
+  @Override
+  public synchronized long skip(long n) throws IOException {
+    /* How can we make sure we don't throw a ChecksumException, at least
+     * in majority of the cases?. This one throws. */
+    long skipped = 0;
+    while (skipped < n) {
+      long needToSkip = n - skipped;
+      if (curDataSlice == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
+        readNextPacket();
+      }
+      if (curDataSlice.remaining() == 0) {
+        // we're at EOF now
+        break;
+      }
+
+      int skip = (int)Math.min(curDataSlice.remaining(), needToSkip);
+      curDataSlice.position(curDataSlice.position() + skip);
+      skipped += skip;
+    }
+    return skipped;
+  }
+
+  private void readTrailingEmptyPacket() throws IOException {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Reading empty packet at end of read");
+    }
+    
+    packetReceiver.receiveNextPacket(in);
+
+    PacketHeader trailer = packetReceiver.getHeader();
+    if (!trailer.isLastPacketInBlock() ||
+       trailer.getDataLen() != 0) {
+      throw new IOException("Expected empty end-of-read packet! Header: " +
+                            trailer);
+    }
+  }
+
+  protected RemoteBlockReader2(String file, String bpid, long blockId,
+      DataChecksum checksum, boolean verifyChecksum,
+      long startOffset, long firstChunkOffset, long bytesToRead, Peer peer,
+      DatanodeID datanodeID, PeerCache peerCache) {
+    this.isLocal = DFSUtilClient.isLocalAddress(NetUtils.
+        createSocketAddr(datanodeID.getXferAddr()));
+    // Path is used only for printing block and file information in debug
+    this.peer = peer;
+    this.datanodeID = datanodeID;
+    this.in = peer.getInputStreamChannel();
+    this.checksum = checksum;
+    this.verifyChecksum = verifyChecksum;
+    this.startOffset = Math.max( startOffset, 0 );
+    this.filename = file;
+    this.peerCache = peerCache;
+    this.blockId = blockId;
+
+    // The total number of bytes that we need to transfer from the DN is
+    // the amount that the user wants (bytesToRead), plus the padding at
+    // the beginning in order to chunk-align. Note that the DN may elect
+    // to send more than this amount if the read starts/ends mid-chunk.
+    this.bytesNeededToFinish = bytesToRead + (startOffset - firstChunkOffset);
+    bytesPerChecksum = this.checksum.getBytesPerChecksum();
+    checksumSize = this.checksum.getChecksumSize();
+  }
+
+
+  @Override
+  public synchronized void close() throws IOException {
+    packetReceiver.close();
+    startOffset = -1;
+    checksum = null;
+    if (peerCache != null && sentStatusCode) {
+      peerCache.put(datanodeID, peer);
+    } else {
+      peer.close();
+    }
+
+    // in will be closed when its Socket is closed.
+  }
+  
+  /**
+   * When the reader reaches end of the read, it sends a status response
+   * (e.g. CHECKSUM_OK) to the DN. Failure to do so could lead to the DN
+   * closing our connection (which we will re-open), but won't affect
+   * data correctness.
+   */
+  void sendReadResult(Status statusCode) {
+    assert !sentStatusCode : "already sent status code to " + peer;
+    try {
+      writeReadResult(peer.getOutputStream(), statusCode);
+      sentStatusCode = true;
+    } catch (IOException e) {
+      // It's ok not to be able to send this. But something is probably wrong.
+      LOG.info("Could not send read status (" + statusCode + ") to datanode " +
+               peer.getRemoteAddressString() + ": " + e.getMessage());
+    }
+  }
+
+  /**
+   * Serialize the actual read result on the wire.
+   */
+  static void writeReadResult(OutputStream out, Status statusCode)
+      throws IOException {
+    
+    ClientReadStatusProto.newBuilder()
+      .setStatus(statusCode)
+      .build()
+      .writeDelimitedTo(out);
+
+    out.flush();
+  }
+  
+  /**
+   * File name to print when accessing a block directly (from servlets)
+   * @param s Address of the block location
+   * @param poolId Block pool ID of the block
+   * @param blockId Block ID of the block
+   * @return string that has a file name for debug purposes
+   */
+  public static String getFileName(final InetSocketAddress s,
+      final String poolId, final long blockId) {
+    return s.toString() + ":" + poolId + ":" + blockId;
+  }
+
+  @Override
+  public int readAll(byte[] buf, int offset, int len) throws IOException {
+    return BlockReaderUtil.readAll(this, buf, offset, len);
+  }
+
+  @Override
+  public void readFully(byte[] buf, int off, int len) throws IOException {
+    BlockReaderUtil.readFully(this, buf, off, len);
+  }
+  
+  /**
+   * Create a new BlockReader specifically to satisfy a read.
+   * This method also sends the OP_READ_BLOCK request.
+   *
+   * @param file  File location
+   * @param block  The block object
+   * @param blockToken  The block token for security
+   * @param startOffset  The read offset, relative to block head
+   * @param len  The number of bytes to read
+   * @param verifyChecksum  Whether to verify checksum
+   * @param clientName  Client name
+   * @param peer  The Peer to use
+   * @param datanodeID  The DatanodeID this peer is connected to
+   * @return New BlockReader instance, or null on error.
+   */
+  public static BlockReader newBlockReader(String file,
+                                     ExtendedBlock block,
+                                     Token<BlockTokenIdentifier> blockToken,
+                                     long startOffset, long len,
+                                     boolean verifyChecksum,
+                                     String clientName,
+                                     Peer peer, DatanodeID datanodeID,
+                                     PeerCache peerCache,
+                                     CachingStrategy cachingStrategy) throws IOException {
+    // in and out will be closed when sock is closed (by the caller)
+    final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
+          peer.getOutputStream()));
+    new Sender(out).readBlock(block, blockToken, clientName, startOffset, len,
+        verifyChecksum, cachingStrategy);
+
+    //
+    // Get bytes in block
+    //
+    DataInputStream in = new DataInputStream(peer.getInputStream());
+
+    BlockOpResponseProto status = BlockOpResponseProto.parseFrom(
+        PBHelperClient.vintPrefixed(in));
+    checkSuccess(status, peer, block, file);
+    ReadOpChecksumInfoProto checksumInfo =
+      status.getReadOpChecksumInfo();
+    DataChecksum checksum = DataTransferProtoUtil.fromProto(
+        checksumInfo.getChecksum());
+    //Warning when we get CHECKSUM_NULL?
+
+    // Read the first chunk offset.
+    long firstChunkOffset = checksumInfo.getChunkOffset();
+
+    if ( firstChunkOffset < 0 || firstChunkOffset > startOffset ||
+        firstChunkOffset <= (startOffset - checksum.getBytesPerChecksum())) {
+      throw new IOException("BlockReader: error in first chunk offset (" +
+                            firstChunkOffset + ") startOffset is " +
+                            startOffset + " for file " + file);
+    }
+
+    return new RemoteBlockReader2(file, block.getBlockPoolId(), block.getBlockId(),
+        checksum, verifyChecksum, startOffset, firstChunkOffset, len, peer,
+        datanodeID, peerCache);
+  }
+
+  static void checkSuccess(
+      BlockOpResponseProto status, Peer peer,
+      ExtendedBlock block, String file)
+      throws IOException {
+    String logInfo = "for OP_READ_BLOCK"
+      + ", self=" + peer.getLocalAddressString()
+      + ", remote=" + peer.getRemoteAddressString()
+      + ", for file " + file
+      + ", for pool " + block.getBlockPoolId()
+      + " block " + block.getBlockId() + "_" + block.getGenerationStamp();
+    DataTransferProtoUtil.checkBlockOpStatus(status, logInfo);
+  }
+  
+  @Override
+  public int available() throws IOException {
+    // An optimistic estimate of how much data is available
+    // to us without doing network I/O.
+    return TCP_WINDOW_SIZE;
+  }
+  
+  @Override
+  public boolean isLocal() {
+    return isLocal;
+  }
+  
+  @Override
+  public boolean isShortCircuit() {
+    return false;
+  }
+
+  @Override
+  public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/826ae1c2/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java
new file mode 100644
index 0000000..c9966a7
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol.datatransfer;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PacketHeaderProto;
+import org.apache.hadoop.hdfs.util.ByteBufferOutputStream;
+
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Shorts;
+import com.google.common.primitives.Ints;
+import com.google.protobuf.InvalidProtocolBufferException;
+
+/**
+ * Header data for each packet that goes through the read/write pipelines.
+ * Includes all of the information about the packet, excluding checksums and
+ * actual data.
+ * 
+ * This data includes:
+ *  - the offset in bytes into the HDFS block of the data in this packet
+ *  - the sequence number of this packet in the pipeline
+ *  - whether or not this is the last packet in the pipeline
+ *  - the length of the data in this packet
+ *  - whether or not this packet should be synced by the DNs.
+ *  
+ * When serialized, this header is written out as a protocol buffer, preceded
+ * by a 4-byte integer representing the full packet length, and a 2-byte short
+ * representing the header length.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class PacketHeader {
+  private static final int MAX_PROTO_SIZE = 
+    PacketHeaderProto.newBuilder()
+      .setOffsetInBlock(0)
+      .setSeqno(0)
+      .setLastPacketInBlock(false)
+      .setDataLen(0)
+      .setSyncBlock(false)
+      .build().getSerializedSize();
+  public static final int PKT_LENGTHS_LEN =
+      Ints.BYTES + Shorts.BYTES;
+  public static final int PKT_MAX_HEADER_LEN =
+      PKT_LENGTHS_LEN + MAX_PROTO_SIZE;
+
+  private int packetLen;
+  private PacketHeaderProto proto;
+
+  public PacketHeader() {
+  }
+
+  public PacketHeader(int packetLen, long offsetInBlock, long seqno,
+                      boolean lastPacketInBlock, int dataLen, boolean syncBlock) {
+    this.packetLen = packetLen;
+    Preconditions.checkArgument(packetLen >= Ints.BYTES,
+        "packet len %s should always be at least 4 bytes",
+        packetLen);
+    
+    PacketHeaderProto.Builder builder = PacketHeaderProto.newBuilder()
+      .setOffsetInBlock(offsetInBlock)
+      .setSeqno(seqno)
+      .setLastPacketInBlock(lastPacketInBlock)
+      .setDataLen(dataLen);
+      
+    if (syncBlock) {
+      // Only set syncBlock if it is specified.
+      // This is wire-incompatible with Hadoop 2.0.0-alpha due to HDFS-3721
+      // because it changes the length of the packet header, and BlockReceiver
+      // in that version did not support variable-length headers.
+      builder.setSyncBlock(syncBlock);
+    }
+      
+    proto = builder.build();
+  }
+
+  public int getDataLen() {
+    return proto.getDataLen();
+  }
+
+  public boolean isLastPacketInBlock() {
+    return proto.getLastPacketInBlock();
+  }
+
+  public long getSeqno() {
+    return proto.getSeqno();
+  }
+
+  public long getOffsetInBlock() {
+    return proto.getOffsetInBlock();
+  }
+
+  public int getPacketLen() {
+    return packetLen;
+  }
+
+  public boolean getSyncBlock() {
+    return proto.getSyncBlock();
+  }
+
+  @Override
+  public String toString() {
+    return "PacketHeader with packetLen=" + packetLen +
+      " header data: " + 
+      proto.toString();
+  }
+  
+  public void setFieldsFromData(
+      int packetLen, byte[] headerData) throws InvalidProtocolBufferException {
+    this.packetLen = packetLen;
+    proto = PacketHeaderProto.parseFrom(headerData);
+  }
+  
+  public void readFields(ByteBuffer buf) throws IOException {
+    packetLen = buf.getInt();
+    short protoLen = buf.getShort();
+    byte[] data = new byte[protoLen];
+    buf.get(data);
+    proto = PacketHeaderProto.parseFrom(data);
+  }
+  
+  public void readFields(DataInputStream in) throws IOException {
+    this.packetLen = in.readInt();
+    short protoLen = in.readShort();
+    byte[] data = new byte[protoLen];
+    in.readFully(data);
+    proto = PacketHeaderProto.parseFrom(data);
+  }
+
+  /**
+   * @return the number of bytes necessary to write out this header,
+   * including the length-prefixing of the payload and header
+   */
+  public int getSerializedSize() {
+    return PKT_LENGTHS_LEN + proto.getSerializedSize();
+  }
+
+  /**
+   * Write the header into the buffer.
+   * This requires that PKT_HEADER_LEN bytes are available.
+   */
+  public void putInBuffer(final ByteBuffer buf) {
+    assert proto.getSerializedSize() <= MAX_PROTO_SIZE
+      : "Expected " + (MAX_PROTO_SIZE) + " got: " + proto.getSerializedSize();
+    try {
+      buf.putInt(packetLen);
+      buf.putShort((short) proto.getSerializedSize());
+      proto.writeTo(new ByteBufferOutputStream(buf));
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+  
+  public void write(DataOutputStream out) throws IOException {
+    assert proto.getSerializedSize() <= MAX_PROTO_SIZE
+    : "Expected " + (MAX_PROTO_SIZE) + " got: " + proto.getSerializedSize();
+    out.writeInt(packetLen);
+    out.writeShort(proto.getSerializedSize());
+    proto.writeTo(out);
+  }
+  
+  public byte[] getBytes() {
+    ByteBuffer buf = ByteBuffer.allocate(getSerializedSize());
+    putInBuffer(buf);
+    return buf.array();
+  }
+
+  /**
+   * Perform a sanity check on the packet, returning true if it is sane.
+   * @param lastSeqNo the previous sequence number received - we expect the current
+   * sequence number to be larger by 1.
+   */
+  public boolean sanityCheck(long lastSeqNo) {
+    // We should only have a non-positive data length for the last packet
+    if (proto.getDataLen() <= 0 && !proto.getLastPacketInBlock()) return false;
+    // The last packet should not contain data
+    if (proto.getLastPacketInBlock() && proto.getDataLen() != 0) return false;
+    // Seqnos should always increase by 1 with each packet received
+    if (proto.getSeqno() != lastSeqNo + 1) return false;
+    return true;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (!(o instanceof PacketHeader)) return false;
+    PacketHeader other = (PacketHeader)o;
+    return this.proto.equals(other.proto);
+  }
+
+  @Override
+  public int hashCode() {
+    return (int)proto.getSeqno();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/826ae1c2/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java
new file mode 100644
index 0000000..c4093b1
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java
@@ -0,0 +1,310 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol.datatransfer;
+
+import java.io.Closeable;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.util.DirectBufferPool;
+import org.apache.hadoop.io.IOUtils;
+
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Ints;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class to handle reading packets one-at-a-time from the wire.
+ * These packets are used both for reading and writing data to/from
+ * DataNodes.
+ */
+@InterfaceAudience.Private
+public class PacketReceiver implements Closeable {
+
+  /**
+   * The max size of any single packet. This prevents OOMEs when
+   * invalid data is sent.
+   */
+  private static final int MAX_PACKET_SIZE = 16 * 1024 * 1024;
+
+  static final Logger LOG = LoggerFactory.getLogger(PacketReceiver.class);
+  
+  private static final DirectBufferPool bufferPool = new DirectBufferPool();
+  private final boolean useDirectBuffers;
+
+  /**
+   * The entirety of the most recently read packet.
+   * The first PKT_LENGTHS_LEN bytes of this buffer are the
+   * length prefixes.
+   */
+  private ByteBuffer curPacketBuf = null;
+  
+  /**
+   * A slice of {@link #curPacketBuf} which contains just the checksums.
+   */
+  private ByteBuffer curChecksumSlice = null;
+  
+  /**
+   * A slice of {@link #curPacketBuf} which contains just the data.
+   */
+  private ByteBuffer curDataSlice = null;
+
+  /**
+   * The packet header of the most recently read packet.
+   */
+  private PacketHeader curHeader;
+  
+  public PacketReceiver(boolean useDirectBuffers) {
+    this.useDirectBuffers = useDirectBuffers;
+    reallocPacketBuf(PacketHeader.PKT_LENGTHS_LEN);
+  }
+
+  public PacketHeader getHeader() {
+    return curHeader;
+  }
+
+  public ByteBuffer getDataSlice() {
+    return curDataSlice;
+  }
+  
+  public ByteBuffer getChecksumSlice() {
+    return curChecksumSlice;
+  }
+
+  /**
+   * Reads all of the data for the next packet into the appropriate buffers.
+   * 
+   * The data slice and checksum slice members will be set to point to the
+   * user data and corresponding checksums. The header will be parsed and
+   * set.
+   */
+  public void receiveNextPacket(ReadableByteChannel in) throws IOException {
+    doRead(in, null);
+  }
+
+  /**
+   * @see #receiveNextPacket(ReadableByteChannel)
+   */
+  public void receiveNextPacket(InputStream in) throws IOException {
+    doRead(null, in);
+  }
+
+  private void doRead(ReadableByteChannel ch, InputStream in)
+      throws IOException {
+    // Each packet looks like:
+    //   PLEN    HLEN      HEADER     CHECKSUMS  DATA
+    //   32-bit  16-bit   <protobuf>  <variable length>
+    //
+    // PLEN:      Payload length
+    //            = length(PLEN) + length(CHECKSUMS) + length(DATA)
+    //            This length includes its own encoded length in
+    //            the sum for historical reasons.
+    //
+    // HLEN:      Header length
+    //            = length(HEADER)
+    //
+    // HEADER:    the actual packet header fields, encoded in protobuf
+    // CHECKSUMS: the crcs for the data chunk. May be missing if
+    //            checksums were not requested
+    // DATA       the actual block data
+    Preconditions.checkState(curHeader == null || !curHeader.isLastPacketInBlock());
+
+    curPacketBuf.clear();
+    curPacketBuf.limit(PacketHeader.PKT_LENGTHS_LEN);
+    doReadFully(ch, in, curPacketBuf);
+    curPacketBuf.flip();
+    int payloadLen = curPacketBuf.getInt();
+    
+    if (payloadLen < Ints.BYTES) {
+      // The "payload length" includes its own length. Therefore it
+      // should never be less than 4 bytes
+      throw new IOException("Invalid payload length " +
+          payloadLen);
+    }
+    int dataPlusChecksumLen = payloadLen - Ints.BYTES;
+    int headerLen = curPacketBuf.getShort();
+    if (headerLen < 0) {
+      throw new IOException("Invalid header length " + headerLen);
+    }
+    
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("readNextPacket: dataPlusChecksumLen = " + dataPlusChecksumLen +
+          " headerLen = " + headerLen);
+    }
+    
+    // Sanity check the buffer size so we don't allocate too much memory
+    // and OOME.
+    int totalLen = payloadLen + headerLen;
+    if (totalLen < 0 || totalLen > MAX_PACKET_SIZE) {
+      throw new IOException("Incorrect value for packet payload size: " +
+                            payloadLen);
+    }
+
+    // Make sure we have space for the whole packet, and
+    // read it.
+    reallocPacketBuf(PacketHeader.PKT_LENGTHS_LEN +
+        dataPlusChecksumLen + headerLen);
+    curPacketBuf.clear();
+    curPacketBuf.position(PacketHeader.PKT_LENGTHS_LEN);
+    curPacketBuf.limit(PacketHeader.PKT_LENGTHS_LEN +
+        dataPlusChecksumLen + headerLen);
+    doReadFully(ch, in, curPacketBuf);
+    curPacketBuf.flip();
+    curPacketBuf.position(PacketHeader.PKT_LENGTHS_LEN);
+
+    // Extract the header from the front of the buffer (after the length prefixes)
+    byte[] headerBuf = new byte[headerLen];
+    curPacketBuf.get(headerBuf);
+    if (curHeader == null) {
+      curHeader = new PacketHeader();
+    }
+    curHeader.setFieldsFromData(payloadLen, headerBuf);
+    
+    // Compute the sub-slices of the packet
+    int checksumLen = dataPlusChecksumLen - curHeader.getDataLen();
+    if (checksumLen < 0) {
+      throw new IOException("Invalid packet: data length in packet header " + 
+          "exceeds data length received. dataPlusChecksumLen=" +
+          dataPlusChecksumLen + " header: " + curHeader); 
+    }
+    
+    reslicePacket(headerLen, checksumLen, curHeader.getDataLen());
+  }
+  
+  /**
+   * Rewrite the last-read packet on the wire to the given output stream.
+   */
+  public void mirrorPacketTo(DataOutputStream mirrorOut) throws IOException {
+    Preconditions.checkState(!useDirectBuffers,
+        "Currently only supported for non-direct buffers");
+    mirrorOut.write(curPacketBuf.array(),
+        curPacketBuf.arrayOffset(),
+        curPacketBuf.remaining());
+  }
+
+  
+  private static void doReadFully(ReadableByteChannel ch, InputStream in,
+      ByteBuffer buf) throws IOException {
+    if (ch != null) {
+      readChannelFully(ch, buf);
+    } else {
+      Preconditions.checkState(!buf.isDirect(),
+          "Must not use direct buffers with InputStream API");
+      IOUtils.readFully(in, buf.array(),
+          buf.arrayOffset() + buf.position(),
+          buf.remaining());
+      buf.position(buf.position() + buf.remaining());
+    }
+  }
+
+  private void reslicePacket(
+      int headerLen, int checksumsLen, int dataLen) {
+    // Packet structure (refer to doRead() for details):
+    //   PLEN    HLEN      HEADER     CHECKSUMS  DATA
+    //   32-bit  16-bit   <protobuf>  <variable length>
+    //   |--- lenThroughHeader ----|
+    //   |----------- lenThroughChecksums   ----|
+    //   |------------------- lenThroughData    ------| 
+    int lenThroughHeader = PacketHeader.PKT_LENGTHS_LEN + headerLen;
+    int lenThroughChecksums = lenThroughHeader + checksumsLen;
+    int lenThroughData = lenThroughChecksums + dataLen;
+
+    assert dataLen >= 0 : "invalid datalen: " + dataLen;
+    assert curPacketBuf.position() == lenThroughHeader;
+    assert curPacketBuf.limit() == lenThroughData :
+      "headerLen= " + headerLen + " clen=" + checksumsLen + " dlen=" + dataLen +
+      " rem=" + curPacketBuf.remaining();
+
+    // Slice the checksums.
+    curPacketBuf.position(lenThroughHeader);
+    curPacketBuf.limit(lenThroughChecksums);
+    curChecksumSlice = curPacketBuf.slice();
+
+    // Slice the data.
+    curPacketBuf.position(lenThroughChecksums);
+    curPacketBuf.limit(lenThroughData);
+    curDataSlice = curPacketBuf.slice();
+    
+    // Reset buffer to point to the entirety of the packet (including
+    // length prefixes)
+    curPacketBuf.position(0);
+    curPacketBuf.limit(lenThroughData);
+  }
+
+  
+  private static void readChannelFully(ReadableByteChannel ch, ByteBuffer buf)
+      throws IOException {
+    while (buf.remaining() > 0) {
+      int n = ch.read(buf);
+      if (n < 0) {
+        throw new IOException("Premature EOF reading from " + ch);
+      }
+    }
+  }
+  
+  private void reallocPacketBuf(int atLeastCapacity) {
+    // Realloc the buffer if this packet is longer than the previous
+    // one.
+    if (curPacketBuf == null ||
+        curPacketBuf.capacity() < atLeastCapacity) {
+      ByteBuffer newBuf;
+      if (useDirectBuffers) {
+        newBuf = bufferPool.getBuffer(atLeastCapacity);
+      } else {
+        newBuf = ByteBuffer.allocate(atLeastCapacity);
+      }
+      // If reallocing an existing buffer, copy the old packet length
+      // prefixes over
+      if (curPacketBuf != null) {
+        curPacketBuf.flip();
+        newBuf.put(curPacketBuf);
+      }
+      
+      returnPacketBufToPool();
+      curPacketBuf = newBuf;
+    }
+  }
+  
+  private void returnPacketBufToPool() {
+    if (curPacketBuf != null && curPacketBuf.isDirect()) {
+      bufferPool.returnBuffer(curPacketBuf);
+      curPacketBuf = null;
+    }
+  }
+
+  @Override // Closeable
+  public void close() {
+    returnPacketBufToPool();
+  }
+  
+  @Override
+  protected void finalize() throws Throwable {
+    try {
+      // just in case it didn't get closed, we
+      // may as well still try to return the buffer
+      returnPacketBufToPool();
+    } finally {
+      super.finalize();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/826ae1c2/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/ByteBufferOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/ByteBufferOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/ByteBufferOutputStream.java
new file mode 100644
index 0000000..31d4dcc
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/ByteBufferOutputStream.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.util;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * OutputStream that writes into a {@link ByteBuffer}.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Stable
+public class ByteBufferOutputStream extends OutputStream {
+
+  private final ByteBuffer buf;
+
+  public ByteBufferOutputStream(ByteBuffer buf) {
+    this.buf = buf;
+  }
+
+  @Override
+  public void write(int b) throws IOException {
+    buf.put((byte)b);
+  }
+
+  @Override
+  public void write(byte[] b, int off, int len) throws IOException {
+    buf.put(b, off, len);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/826ae1c2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 7b5979e..ef8fac5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -867,6 +867,9 @@ Release 2.8.0 - UNRELEASED
 
     HDFS-8980. Remove unnecessary block replacement in INodeFile. (jing9)
 
+    HDFS-8990. Move RemoteBlockReader to hdfs-client module.
+    (Mingliang via wheat9)
+
   OPTIMIZATIONS
 
     HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

http://git-wip-us.apache.org/repos/asf/hadoop/blob/826ae1c2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index 3c49ef7..268a5b9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -203,7 +203,6 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
     DataEncryptionKeyFactory {
   public static final Log LOG = LogFactory.getLog(DFSClient.class);
   public static final long SERVER_DEFAULTS_VALIDITY_PERIOD = 60 * 60 * 1000L; // 1 hour
-  static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB
 
   private final Configuration conf;
   private final DfsClientConf dfsClientConf;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/826ae1c2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
deleted file mode 100644
index 015e154..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
+++ /dev/null
@@ -1,508 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs;
-
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.EnumSet;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.fs.FSInputChecker;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.ReadOption;
-import org.apache.hadoop.hdfs.net.Peer;
-import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.DatanodeID;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
-import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
-import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
-import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
-import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.DataChecksum;
-import org.apache.htrace.Sampler;
-import org.apache.htrace.Trace;
-import org.apache.htrace.TraceScope;
-
-
-/**
- * @deprecated this is an old implementation that is being left around
- * in case any issues spring up with the new {@link RemoteBlockReader2} implementation.
- * It will be removed in the next release.
- */
-@InterfaceAudience.Private
-@Deprecated
-public class RemoteBlockReader extends FSInputChecker implements BlockReader {
-  private final Peer peer;
-  private final DatanodeID datanodeID;
-  private final DataInputStream in;
-  private DataChecksum checksum;
-
-  /** offset in block of the last chunk received */
-  private long lastChunkOffset = -1;
-  private long lastChunkLen = -1;
-  private long lastSeqNo = -1;
-
-  /** offset in block where reader wants to actually read */
-  private long startOffset;
-
-  private final long blockId;
-
-  /** offset in block of of first chunk - may be less than startOffset
-      if startOffset is not chunk-aligned */
-  private final long firstChunkOffset;
-
-  private final int bytesPerChecksum;
-  private final int checksumSize;
-
-  /**
-   * The total number of bytes we need to transfer from the DN.
-   * This is the amount that the user has requested plus some padding
-   * at the beginning so that the read can begin on a chunk boundary.
-   */
-  private final long bytesNeededToFinish;
-  
-  /**
-   * True if we are reading from a local DataNode.
-   */
-  private final boolean isLocal;
-
-  private boolean eos = false;
-  private boolean sentStatusCode = false;
-  
-  ByteBuffer checksumBytes = null;
-  /** Amount of unread data in the current received packet */
-  int dataLeft = 0;
-  
-  private final PeerCache peerCache;
-  
-  /* FSInputChecker interface */
-  
-  /* same interface as inputStream java.io.InputStream#read()
-   * used by DFSInputStream#read()
-   * This violates one rule when there is a checksum error:
-   * "Read should not modify user buffer before successful read"
-   * because it first reads the data to user buffer and then checks
-   * the checksum.
-   */
-  @Override
-  public synchronized int read(byte[] buf, int off, int len) 
-                               throws IOException {
-    
-    // This has to be set here, *before* the skip, since we can
-    // hit EOS during the skip, in the case that our entire read
-    // is smaller than the checksum chunk.
-    boolean eosBefore = eos;
-
-    //for the first read, skip the extra bytes at the front.
-    if (lastChunkLen < 0 && startOffset > firstChunkOffset && len > 0) {
-      // Skip these bytes. But don't call this.skip()!
-      int toSkip = (int)(startOffset - firstChunkOffset);
-      if ( super.readAndDiscard(toSkip) != toSkip ) {
-        // should never happen
-        throw new IOException("Could not skip required number of bytes");
-      }
-    }
-    
-    int nRead = super.read(buf, off, len);
-
-    // if eos was set in the previous read, send a status code to the DN
-    if (eos && !eosBefore && nRead >= 0) {
-      if (needChecksum()) {
-        sendReadResult(peer, Status.CHECKSUM_OK);
-      } else {
-        sendReadResult(peer, Status.SUCCESS);
-      }
-    }
-    return nRead;
-  }
-
-  @Override
-  public synchronized long skip(long n) throws IOException {
-    /* How can we make sure we don't throw a ChecksumException, at least
-     * in majority of the cases?. This one throws. */  
-    long nSkipped = 0;
-    while (nSkipped < n) {
-      int toSkip = (int)Math.min(n-nSkipped, Integer.MAX_VALUE);
-      int ret = readAndDiscard(toSkip);
-      if (ret <= 0) {
-        return nSkipped;
-      }
-      nSkipped += ret;
-    }
-    return nSkipped;
-  }
-
-  @Override
-  public int read() throws IOException {
-    throw new IOException("read() is not expected to be invoked. " +
-                          "Use read(buf, off, len) instead.");
-  }
-  
-  @Override
-  public boolean seekToNewSource(long targetPos) throws IOException {
-    /* Checksum errors are handled outside the BlockReader. 
-     * DFSInputStream does not always call 'seekToNewSource'. In the 
-     * case of pread(), it just tries a different replica without seeking.
-     */ 
-    return false;
-  }
-  
-  @Override
-  public void seek(long pos) throws IOException {
-    throw new IOException("Seek() is not supported in BlockInputChecker");
-  }
-
-  @Override
-  protected long getChunkPosition(long pos) {
-    throw new RuntimeException("getChunkPosition() is not supported, " +
-                               "since seek is not required");
-  }
-  
-  /**
-   * Makes sure that checksumBytes has enough capacity 
-   * and limit is set to the number of checksum bytes needed 
-   * to be read.
-   */
-  private void adjustChecksumBytes(int dataLen) {
-    int requiredSize = 
-      ((dataLen + bytesPerChecksum - 1)/bytesPerChecksum)*checksumSize;
-    if (checksumBytes == null || requiredSize > checksumBytes.capacity()) {
-      checksumBytes =  ByteBuffer.wrap(new byte[requiredSize]);
-    } else {
-      checksumBytes.clear();
-    }
-    checksumBytes.limit(requiredSize);
-  }
-  
-  @Override
-  protected synchronized int readChunk(long pos, byte[] buf, int offset, 
-                                       int len, byte[] checksumBuf) 
-                                       throws IOException {
-    TraceScope scope =
-        Trace.startSpan("RemoteBlockReader#readChunk(" + blockId + ")",
-            Sampler.NEVER);
-    try {
-      return readChunkImpl(pos, buf, offset, len, checksumBuf);
-    } finally {
-      scope.close();
-    }
-  }
-
-  private synchronized int readChunkImpl(long pos, byte[] buf, int offset,
-                                     int len, byte[] checksumBuf)
-                                     throws IOException {
-    // Read one chunk.
-    if (eos) {
-      // Already hit EOF
-      return -1;
-    }
-    
-    // Read one DATA_CHUNK.
-    long chunkOffset = lastChunkOffset;
-    if ( lastChunkLen > 0 ) {
-      chunkOffset += lastChunkLen;
-    }
-    
-    // pos is relative to the start of the first chunk of the read.
-    // chunkOffset is relative to the start of the block.
-    // This makes sure that the read passed from FSInputChecker is the
-    // for the same chunk we expect to be reading from the DN.
-    if ( (pos + firstChunkOffset) != chunkOffset ) {
-      throw new IOException("Mismatch in pos : " + pos + " + " + 
-                            firstChunkOffset + " != " + chunkOffset);
-    }
-
-    // Read next packet if the previous packet has been read completely.
-    if (dataLeft <= 0) {
-      //Read packet headers.
-      PacketHeader header = new PacketHeader();
-      header.readFields(in);
-
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("DFSClient readChunk got header " + header);
-      }
-
-      // Sanity check the lengths
-      if (!header.sanityCheck(lastSeqNo)) {
-           throw new IOException("BlockReader: error in packet header " +
-                                 header);
-      }
-
-      lastSeqNo = header.getSeqno();
-      dataLeft = header.getDataLen();
-      adjustChecksumBytes(header.getDataLen());
-      if (header.getDataLen() > 0) {
-        IOUtils.readFully(in, checksumBytes.array(), 0,
-                          checksumBytes.limit());
-      }
-    }
-
-    // Sanity checks
-    assert len >= bytesPerChecksum;
-    assert checksum != null;
-    assert checksumSize == 0 || (checksumBuf.length % checksumSize == 0);
-
-
-    int checksumsToRead, bytesToRead;
-
-    if (checksumSize > 0) {
-
-      // How many chunks left in our packet - this is a ceiling
-      // since we may have a partial chunk at the end of the file
-      int chunksLeft = (dataLeft - 1) / bytesPerChecksum + 1;
-
-      // How many chunks we can fit in databuffer
-      //  - note this is a floor since we always read full chunks
-      int chunksCanFit = Math.min(len / bytesPerChecksum,
-                                  checksumBuf.length / checksumSize);
-
-      // How many chunks should we read
-      checksumsToRead = Math.min(chunksLeft, chunksCanFit);
-      // How many bytes should we actually read
-      bytesToRead = Math.min(
-        checksumsToRead * bytesPerChecksum, // full chunks
-        dataLeft); // in case we have a partial
-    } else {
-      // no checksum
-      bytesToRead = Math.min(dataLeft, len);
-      checksumsToRead = 0;
-    }
-
-    if ( bytesToRead > 0 ) {
-      // Assert we have enough space
-      assert bytesToRead <= len;
-      assert checksumBytes.remaining() >= checksumSize * checksumsToRead;
-      assert checksumBuf.length >= checksumSize * checksumsToRead;
-      IOUtils.readFully(in, buf, offset, bytesToRead);
-      checksumBytes.get(checksumBuf, 0, checksumSize * checksumsToRead);
-    }
-
-    dataLeft -= bytesToRead;
-    assert dataLeft >= 0;
-
-    lastChunkOffset = chunkOffset;
-    lastChunkLen = bytesToRead;
-
-    // If there's no data left in the current packet after satisfying
-    // this read, and we have satisfied the client read, we expect
-    // an empty packet header from the DN to signify this.
-    // Note that pos + bytesToRead may in fact be greater since the
-    // DN finishes off the entire last chunk.
-    if (dataLeft == 0 &&
-        pos + bytesToRead >= bytesNeededToFinish) {
-
-      // Read header
-      PacketHeader hdr = new PacketHeader();
-      hdr.readFields(in);
-
-      if (!hdr.isLastPacketInBlock() ||
-          hdr.getDataLen() != 0) {
-        throw new IOException("Expected empty end-of-read packet! Header: " +
-                              hdr);
-      }
-
-      eos = true;
-    }
-
-    if ( bytesToRead == 0 ) {
-      return -1;
-    }
-
-    return bytesToRead;
-  }
-  
-  private RemoteBlockReader(String file, String bpid, long blockId,
-      DataInputStream in, DataChecksum checksum, boolean verifyChecksum,
-      long startOffset, long firstChunkOffset, long bytesToRead, Peer peer,
-      DatanodeID datanodeID, PeerCache peerCache) {
-    // Path is used only for printing block and file information in debug
-    super(new Path("/" + Block.BLOCK_FILE_PREFIX + blockId +
-                    ":" + bpid + ":of:"+ file)/*too non path-like?*/,
-          1, verifyChecksum,
-          checksum.getChecksumSize() > 0? checksum : null, 
-          checksum.getBytesPerChecksum(),
-          checksum.getChecksumSize());
-
-    this.isLocal = DFSUtilClient.isLocalAddress(NetUtils.
-        createSocketAddr(datanodeID.getXferAddr()));
-    
-    this.peer = peer;
-    this.datanodeID = datanodeID;
-    this.in = in;
-    this.checksum = checksum;
-    this.startOffset = Math.max( startOffset, 0 );
-    this.blockId = blockId;
-
-    // The total number of bytes that we need to transfer from the DN is
-    // the amount that the user wants (bytesToRead), plus the padding at
-    // the beginning in order to chunk-align. Note that the DN may elect
-    // to send more than this amount if the read starts/ends mid-chunk.
-    this.bytesNeededToFinish = bytesToRead + (startOffset - firstChunkOffset);
-
-    this.firstChunkOffset = firstChunkOffset;
-    lastChunkOffset = firstChunkOffset;
-    lastChunkLen = -1;
-
-    bytesPerChecksum = this.checksum.getBytesPerChecksum();
-    checksumSize = this.checksum.getChecksumSize();
-    this.peerCache = peerCache;
-  }
-
-  /**
-   * Create a new BlockReader specifically to satisfy a read.
-   * This method also sends the OP_READ_BLOCK request.
-   *
-   * @param file  File location
-   * @param block  The block object
-   * @param blockToken  The block token for security
-   * @param startOffset  The read offset, relative to block head
-   * @param len  The number of bytes to read
-   * @param bufferSize  The IO buffer size (not the client buffer size)
-   * @param verifyChecksum  Whether to verify checksum
-   * @param clientName  Client name
-   * @return New BlockReader instance, or null on error.
-   */
-  public static RemoteBlockReader newBlockReader(String file,
-                                     ExtendedBlock block, 
-                                     Token<BlockTokenIdentifier> blockToken,
-                                     long startOffset, long len,
-                                     int bufferSize, boolean verifyChecksum,
-                                     String clientName, Peer peer,
-                                     DatanodeID datanodeID,
-                                     PeerCache peerCache,
-                                     CachingStrategy cachingStrategy)
-                                       throws IOException {
-    // in and out will be closed when sock is closed (by the caller)
-    final DataOutputStream out =
-        new DataOutputStream(new BufferedOutputStream(peer.getOutputStream()));
-    new Sender(out).readBlock(block, blockToken, clientName, startOffset, len,
-        verifyChecksum, cachingStrategy);
-    
-    //
-    // Get bytes in block, set streams
-    //
-
-    DataInputStream in = new DataInputStream(
-        new BufferedInputStream(peer.getInputStream(), bufferSize));
-    
-    BlockOpResponseProto status = BlockOpResponseProto.parseFrom(
-        PBHelperClient.vintPrefixed(in));
-    RemoteBlockReader2.checkSuccess(status, peer, block, file);
-    ReadOpChecksumInfoProto checksumInfo =
-      status.getReadOpChecksumInfo();
-    DataChecksum checksum = DataTransferProtoUtil.fromProto(
-        checksumInfo.getChecksum());
-    //Warning when we get CHECKSUM_NULL?
-    
-    // Read the first chunk offset.
-    long firstChunkOffset = checksumInfo.getChunkOffset();
-    
-    if ( firstChunkOffset < 0 || firstChunkOffset > startOffset ||
-        firstChunkOffset <= (startOffset - checksum.getBytesPerChecksum())) {
-      throw new IOException("BlockReader: error in first chunk offset (" +
-                            firstChunkOffset + ") startOffset is " + 
-                            startOffset + " for file " + file);
-    }
-
-    return new RemoteBlockReader(file, block.getBlockPoolId(), block.getBlockId(),
-        in, checksum, verifyChecksum, startOffset, firstChunkOffset, len,
-        peer, datanodeID, peerCache);
-  }
-
-  @Override
-  public synchronized void close() throws IOException {
-    startOffset = -1;
-    checksum = null;
-    if (peerCache != null & sentStatusCode) {
-      peerCache.put(datanodeID, peer);
-    } else {
-      peer.close();
-    }
-
-    // in will be closed when its Socket is closed.
-  }
-  
-  @Override
-  public void readFully(byte[] buf, int readOffset, int amtToRead)
-      throws IOException {
-    IOUtils.readFully(this, buf, readOffset, amtToRead);
-  }
-
-  @Override
-  public int readAll(byte[] buf, int offset, int len) throws IOException {
-    return readFully(this, buf, offset, len);
-  }
-
-  /**
-   * When the reader reaches end of the read, it sends a status response
-   * (e.g. CHECKSUM_OK) to the DN. Failure to do so could lead to the DN
-   * closing our connection (which we will re-open), but won't affect
-   * data correctness.
-   */
-  void sendReadResult(Peer peer, Status statusCode) {
-    assert !sentStatusCode : "already sent status code to " + peer;
-    try {
-      RemoteBlockReader2.writeReadResult(peer.getOutputStream(), statusCode);
-      sentStatusCode = true;
-    } catch (IOException e) {
-      // It's ok not to be able to send this. But something is probably wrong.
-      LOG.info("Could not send read status (" + statusCode + ") to datanode " +
-               peer.getRemoteAddressString() + ": " + e.getMessage());
-    }
-  }
-
-  @Override
-  public int read(ByteBuffer buf) throws IOException {
-    throw new UnsupportedOperationException("readDirect unsupported in RemoteBlockReader");
-  }
-  
-  @Override
-  public int available() throws IOException {
-    // An optimistic estimate of how much data is available
-    // to us without doing network I/O.
-    return DFSClient.TCP_WINDOW_SIZE;
-  }
-
-  @Override
-  public boolean isLocal() {
-    return isLocal;
-  }
-  
-  @Override
-  public boolean isShortCircuit() {
-    return false;
-  }
-
-  @Override
-  public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
-    return null;
-  }
-}