You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by um...@apache.org on 2016/03/01 06:53:42 UTC

hadoop git commit: HDFS-9733. Refactor DFSClient#getFileChecksum and DataXceiver#blockChecksum. Contributed by Kai Zheng

Repository: hadoop
Updated Branches:
  refs/heads/trunk 680f3fc02 -> 307ec80ac


HDFS-9733. Refactor DFSClient#getFileChecksum and DataXceiver#blockChecksum. Contributed by Kai Zheng


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/307ec80a
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/307ec80a
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/307ec80a

Branch: refs/heads/trunk
Commit: 307ec80acae3b4a41d21b2d4b3a55032e55fcdc6
Parents: 680f3fc
Author: Uma Maheswara Rao G <um...@intel.com>
Authored: Mon Feb 29 21:52:20 2016 -0800
Committer: Uma Maheswara Rao G <um...@intel.com>
Committed: Mon Feb 29 21:52:20 2016 -0800

----------------------------------------------------------------------
 .../main/java/org/apache/hadoop/io/IOUtils.java |   4 +-
 .../main/java/org/apache/hadoop/io/MD5Hash.java |  11 +
 .../org/apache/hadoop/util/DataChecksum.java    |  12 +-
 .../java/org/apache/hadoop/hdfs/DFSClient.java  | 237 ++---------
 .../org/apache/hadoop/hdfs/DFSUtilClient.java   |  49 +++
 .../apache/hadoop/hdfs/FileChecksumHelper.java  | 416 +++++++++++++++++++
 .../protocol/datatransfer/IOStreamPair.java     |  11 +-
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 .../server/datanode/BlockChecksumHelper.java    | 254 +++++++++++
 .../hdfs/server/datanode/BlockSender.java       |   1 -
 .../hadoop/hdfs/server/datanode/DataNode.java   |  10 +
 .../hdfs/server/datanode/DataXceiver.java       | 162 +++-----
 12 files changed, 846 insertions(+), 324 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/307ec80a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/IOUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/IOUtils.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/IOUtils.java
index 451163c..2588bf1 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/IOUtils.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/IOUtils.java
@@ -261,7 +261,9 @@ public class IOUtils {
    * @param stream the Stream to close
    */
   public static void closeStream(java.io.Closeable stream) {
-    cleanup(null, stream);
+    if (stream != null) {
+      cleanup(null, stream);
+    }
   }
   
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/307ec80a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/MD5Hash.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/MD5Hash.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/MD5Hash.java
index 822e089..aaf3ea1 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/MD5Hash.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/MD5Hash.java
@@ -128,6 +128,17 @@ public class MD5Hash implements WritableComparable<MD5Hash> {
     return new MD5Hash(digest);
   }
 
+  /** Construct a hash value for an array of byte array. */
+  public static MD5Hash digest(byte[][] dataArr, int start, int len) {
+    byte[] digest;
+    MessageDigest digester = getDigester();
+    for (byte[] data : dataArr) {
+      digester.update(data, start, len);
+    }
+    digest = digester.digest();
+    return new MD5Hash(digest);
+  }
+
   /** Construct a hash value for a String. */
   public static MD5Hash digest(String string) {
     return digest(UTF8.getBytes(string));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/307ec80a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java
index faac587..e44b64d 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java
@@ -45,7 +45,7 @@ public class DataChecksum implements Checksum {
   public static final int CHECKSUM_MIXED   = 4;
  
   /** The checksum types */
-  public static enum Type {
+  public enum Type {
     NULL  (CHECKSUM_NULL, 0),
     CRC32 (CHECKSUM_CRC32, 4),
     CRC32C(CHECKSUM_CRC32C, 4),
@@ -55,7 +55,7 @@ public class DataChecksum implements Checksum {
     public final int id;
     public final int size;
     
-    private Type(int id, int size) {
+    Type(int id, int size) {
       this.id = id;
       this.size = size;
     }
@@ -230,17 +230,21 @@ public class DataChecksum implements Checksum {
   public Type getChecksumType() {
     return type;
   }
+
   /** @return the size for a checksum. */
   public int getChecksumSize() {
     return type.size;
   }
+
   /** @return the required checksum size given the data length. */
   public int getChecksumSize(int dataSize) {
     return ((dataSize - 1)/getBytesPerChecksum() + 1) * getChecksumSize(); 
   }
+
   public int getBytesPerChecksum() {
     return bytesPerChecksum;
   }
+
   public int getNumBytesInSum() {
     return inSum;
   }
@@ -249,16 +253,19 @@ public class DataChecksum implements Checksum {
   static public int getChecksumHeaderSize() {
     return 1 + SIZE_OF_INTEGER; // type byte, bytesPerChecksum int
   }
+
   //Checksum Interface. Just a wrapper around member summer.
   @Override
   public long getValue() {
     return summer.getValue();
   }
+
   @Override
   public void reset() {
     summer.reset();
     inSum = 0;
   }
+
   @Override
   public void update( byte[] b, int off, int len ) {
     if ( len > 0 ) {
@@ -266,6 +273,7 @@ public class DataChecksum implements Checksum {
       inSum += len;
     }
   }
+
   @Override
   public void update( int b ) {
     summer.update( b );

http://git-wip-us.apache.org/repos/asf/hadoop/blob/307ec80a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index 15a49f1..da3f745 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -27,12 +27,9 @@ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_LOCA
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_DEFAULT;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY;
 
-import java.io.BufferedOutputStream;
-import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
@@ -80,9 +77,7 @@ import org.apache.hadoop.fs.FsStatus;
 import org.apache.hadoop.fs.FsTracer;
 import org.apache.hadoop.fs.HdfsBlockLocation;
 import org.apache.hadoop.fs.InvalidPathException;
-import org.apache.hadoop.fs.MD5MD5CRC32CastagnoliFileChecksum;
 import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
-import org.apache.hadoop.fs.MD5MD5CRC32GzipFileChecksum;
 import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.Options.ChecksumOpt;
 import org.apache.hadoop.fs.ParentNotDirectoryException;
@@ -138,7 +133,6 @@ import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
 import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
 import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
-import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
 import org.apache.hadoop.hdfs.protocol.datatransfer.ReplaceDatanodeOnFailure;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
 import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
@@ -146,20 +140,16 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactor
 import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil;
 import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto;
 import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
-import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
 import org.apache.hadoop.hdfs.util.IOUtilsClient;
-import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.EnumSetWritable;
 import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.MD5Hash;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.retry.LossyRetryInvocationHandler;
 import org.apache.hadoop.ipc.RPC;
@@ -1293,7 +1283,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
 
   /**
    * Invoke namenode append RPC.
-   * It retries in case of {@link BlockNotYetCompleteException}.
+   * It retries in case of some {@link RetriableException}.
    */
   private LastBlockWithStatus callAppend(String src,
       EnumSetWritable<CreateFlag> flag) throws IOException {
@@ -1695,7 +1685,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
   }
 
   /**
-   * Get the checksum of the whole file of a range of the file. Note that the
+   * Get the checksum of the whole file or a range of the file. Note that the
    * range always starts from the beginning of the file.
    * @param src The file path
    * @param length the length of the range, i.e., the range is [0, length]
@@ -1706,9 +1696,23 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
       throws IOException {
     checkOpen();
     Preconditions.checkArgument(length >= 0);
+
+    LocatedBlocks blockLocations = getBlockLocations(src, length);
+
+    FileChecksumHelper.FileChecksumComputer maker =
+        new FileChecksumHelper.ReplicatedFileChecksumComputer(src, length,
+            blockLocations, namenode, this);
+
+    maker.compute();
+
+    return maker.getFileChecksum();
+  }
+
+  protected LocatedBlocks getBlockLocations(String src,
+                                            long length) throws IOException {
     //get block locations for the file range
-    LocatedBlocks blockLocations = callGetBlockLocations(namenode, src, 0,
-        length);
+    LocatedBlocks blockLocations = callGetBlockLocations(namenode,
+        src, 0, length);
     if (null == blockLocations) {
       throw new FileNotFoundException("File does not exist: " + src);
     }
@@ -1716,194 +1720,15 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
       throw new IOException("Fail to get checksum, since file " + src
           + " is under construction.");
     }
-    List<LocatedBlock> locatedblocks = blockLocations.getLocatedBlocks();
-    final DataOutputBuffer md5out = new DataOutputBuffer();
-    int bytesPerCRC = -1;
-    DataChecksum.Type crcType = DataChecksum.Type.DEFAULT;
-    long crcPerBlock = 0;
-    boolean refetchBlocks = false;
-    int lastRetriedIndex = -1;
-
-    // get block checksum for each block
-    long remaining = length;
-    if (src.contains(HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR_SEPARATOR)) {
-      remaining = Math.min(length, blockLocations.getFileLength());
-    }
-    for(int i = 0; i < locatedblocks.size() && remaining > 0; i++) {
-      if (refetchBlocks) {  // refetch to get fresh tokens
-        blockLocations = callGetBlockLocations(namenode, src, 0, length);
-        if (null == blockLocations) {
-          throw new FileNotFoundException("File does not exist: " + src);
-        }
-        if (blockLocations.isUnderConstruction()) {
-          throw new IOException("Fail to get checksum, since file " + src
-              + " is under construction.");
-        }
-        locatedblocks = blockLocations.getLocatedBlocks();
-        refetchBlocks = false;
-      }
-      LocatedBlock lb = locatedblocks.get(i);
-      final ExtendedBlock block = lb.getBlock();
-      if (remaining < block.getNumBytes()) {
-        block.setNumBytes(remaining);
-      }
-      remaining -= block.getNumBytes();
-      final DatanodeInfo[] datanodes = lb.getLocations();
-
-      //try each datanode location of the block
-      final int timeout = 3000 * datanodes.length +
-          dfsClientConf.getSocketTimeout();
-      boolean done = false;
-      for(int j = 0; !done && j < datanodes.length; j++) {
-        DataOutputStream out = null;
-        DataInputStream in = null;
-
-        try {
-          //connect to a datanode
-          IOStreamPair pair = connectToDN(datanodes[j], timeout, lb);
-          out = new DataOutputStream(new BufferedOutputStream(pair.out,
-              smallBufferSize));
-          in = new DataInputStream(pair.in);
-
-          LOG.debug("write to {}: {}, block={}",
-              datanodes[j], Op.BLOCK_CHECKSUM, block);
-          // get block MD5
-          new Sender(out).blockChecksum(block, lb.getBlockToken());
-
-          final BlockOpResponseProto reply =
-              BlockOpResponseProto.parseFrom(PBHelperClient.vintPrefixed(in));
-
-          String logInfo = "for block " + block + " from datanode " +
-              datanodes[j];
-          DataTransferProtoUtil.checkBlockOpStatus(reply, logInfo);
-
-          OpBlockChecksumResponseProto checksumData =
-              reply.getChecksumResponse();
-
-          //read byte-per-checksum
-          final int bpc = checksumData.getBytesPerCrc();
-          if (i == 0) { //first block
-            bytesPerCRC = bpc;
-          }
-          else if (bpc != bytesPerCRC) {
-            throw new IOException("Byte-per-checksum not matched: bpc=" + bpc
-                + " but bytesPerCRC=" + bytesPerCRC);
-          }
-
-          //read crc-per-block
-          final long cpb = checksumData.getCrcPerBlock();
-          if (locatedblocks.size() > 1 && i == 0) {
-            crcPerBlock = cpb;
-          }
-
-          //read md5
-          final MD5Hash md5 = new MD5Hash(
-              checksumData.getMd5().toByteArray());
-          md5.write(md5out);
-
-          // read crc-type
-          final DataChecksum.Type ct;
-          if (checksumData.hasCrcType()) {
-            ct = PBHelperClient.convert(checksumData
-                .getCrcType());
-          } else {
-            LOG.debug("Retrieving checksum from an earlier-version DataNode: " +
-                "inferring checksum by reading first byte");
-            ct = inferChecksumTypeByReading(lb, datanodes[j]);
-          }
-
-          if (i == 0) { // first block
-            crcType = ct;
-          } else if (crcType != DataChecksum.Type.MIXED
-              && crcType != ct) {
-            // if crc types are mixed in a file
-            crcType = DataChecksum.Type.MIXED;
-          }
-
-          done = true;
-
-          if (LOG.isDebugEnabled()) {
-            if (i == 0) {
-              LOG.debug("set bytesPerCRC=" + bytesPerCRC
-                  + ", crcPerBlock=" + crcPerBlock);
-            }
-            LOG.debug("got reply from " + datanodes[j] + ": md5=" + md5);
-          }
-        } catch (InvalidBlockTokenException ibte) {
-          if (i > lastRetriedIndex) {
-            LOG.debug("Got access token error in response to OP_BLOCK_CHECKSUM "
-                    + "for file {} for block {} from datanode {}. Will retry "
-                    + "the block once.",
-                src, block, datanodes[j]);
-            lastRetriedIndex = i;
-            done = true; // actually it's not done; but we'll retry
-            i--; // repeat at i-th block
-            refetchBlocks = true;
-            break;
-          }
-        } catch (IOException ie) {
-          LOG.warn("src=" + src + ", datanodes["+j+"]=" + datanodes[j], ie);
-        } finally {
-          IOUtils.closeStream(in);
-          IOUtils.closeStream(out);
-        }
-      }
-
-      if (!done) {
-        throw new IOException("Fail to get block MD5 for " + block);
-      }
-    }
 
-    //compute file MD5
-    final MD5Hash fileMD5 = MD5Hash.digest(md5out.getData());
-    switch (crcType) {
-    case CRC32:
-      return new MD5MD5CRC32GzipFileChecksum(bytesPerCRC,
-          crcPerBlock, fileMD5);
-    case CRC32C:
-      return new MD5MD5CRC32CastagnoliFileChecksum(bytesPerCRC,
-          crcPerBlock, fileMD5);
-    default:
-      // If there is no block allocated for the file,
-      // return one with the magic entry that matches what previous
-      // hdfs versions return.
-      if (locatedblocks.size() == 0) {
-        return new MD5MD5CRC32GzipFileChecksum(0, 0, fileMD5);
-      }
-
-      // we should never get here since the validity was checked
-      // when getCrcType() was called above.
-      return null;
-    }
+    return blockLocations;
   }
 
-  /**
-   * Connect to the given datanode's datantrasfer port, and return
-   * the resulting IOStreamPair. This includes encryption wrapping, etc.
-   */
-  private IOStreamPair connectToDN(DatanodeInfo dn, int timeout,
-      LocatedBlock lb) throws IOException {
-    boolean success = false;
-    Socket sock = null;
-    try {
-      sock = socketFactory.createSocket();
-      String dnAddr = dn.getXferAddr(getConf().isConnectToDnViaHostname());
-      LOG.debug("Connecting to datanode {}", dnAddr);
-      NetUtils.connect(sock, NetUtils.createSocketAddr(dnAddr), timeout);
-      sock.setTcpNoDelay(dfsClientConf.getDataTransferTcpNoDelay());
-      sock.setSoTimeout(timeout);
-
-      OutputStream unbufOut = NetUtils.getOutputStream(sock);
-      InputStream unbufIn = NetUtils.getInputStream(sock);
-      IOStreamPair ret = saslClient.newSocketSend(sock, unbufOut, unbufIn, this,
-          lb.getBlockToken(), dn);
-      success = true;
-      return ret;
-    } finally {
-      if (!success) {
-        IOUtils.closeSocket(sock);
-      }
-    }
+  protected IOStreamPair connectToDN(DatanodeInfo dn, int timeout,
+                                     Token<BlockTokenIdentifier> blockToken)
+      throws IOException {
+    return DFSUtilClient.connectToDN(dn, timeout, conf, saslClient,
+        socketFactory, getConf().isConnectToDnViaHostname(), this, blockToken);
   }
 
   /**
@@ -1917,19 +1742,17 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
    * @return the inferred checksum type
    * @throws IOException if an error occurs
    */
-  private Type inferChecksumTypeByReading(LocatedBlock lb, DatanodeInfo dn)
+  protected Type inferChecksumTypeByReading(LocatedBlock lb, DatanodeInfo dn)
       throws IOException {
-    IOStreamPair pair = connectToDN(dn, dfsClientConf.getSocketTimeout(), lb);
+    IOStreamPair pair = connectToDN(dn, dfsClientConf.getSocketTimeout(),
+        lb.getBlockToken());
 
     try {
-      DataOutputStream out = new DataOutputStream(
-          new BufferedOutputStream(pair.out, smallBufferSize));
-      DataInputStream in = new DataInputStream(pair.in);
-
-      new Sender(out).readBlock(lb.getBlock(), lb.getBlockToken(), clientName,
+      new Sender((DataOutputStream) pair.out).readBlock(lb.getBlock(),
+          lb.getBlockToken(), clientName,
           0, 1, true, CachingStrategy.newDefaultStrategy());
       final BlockOpResponseProto reply =
-          BlockOpResponseProto.parseFrom(PBHelperClient.vintPrefixed(in));
+          BlockOpResponseProto.parseFrom(PBHelperClient.vintPrefixed(pair.in));
       String logInfo = "trying to read " + lb.getBlock() + " from datanode " +
           dn;
       DataTransferProtoUtil.checkBlockOpStatus(reply, logInfo);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/307ec80a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
index d646252..880234e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
 import org.apache.hadoop.hdfs.protocol.ReconfigurationProtocol;
 import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
 import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
@@ -47,6 +48,7 @@ import org.apache.hadoop.hdfs.protocolPB.ReconfigurationProtocolTranslatorPB;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.util.IOUtilsClient;
 import org.apache.hadoop.hdfs.web.WebHdfsConstants;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -56,8 +58,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.net.SocketFactory;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.InterruptedIOException;
+import java.io.OutputStream;
 import java.io.UnsupportedEncodingException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
@@ -717,4 +724,46 @@ public class DFSUtilClient {
       return corruptionMap;
     }
   }
+
+  /**
+   * Connect to the given datanode's datantrasfer port, and return
+   * the resulting IOStreamPair. This includes encryption wrapping, etc.
+   */
+  public static IOStreamPair connectToDN(DatanodeInfo dn, int timeout,
+                                         Configuration conf,
+                                         SaslDataTransferClient saslClient,
+                                         SocketFactory socketFactory,
+                                         boolean connectToDnViaHostname,
+                                         DataEncryptionKeyFactory dekFactory,
+                                         Token<BlockTokenIdentifier> blockToken)
+      throws IOException {
+
+    boolean success = false;
+    Socket sock = null;
+    try {
+      sock = socketFactory.createSocket();
+      String dnAddr = dn.getXferAddr(connectToDnViaHostname);
+      LOG.debug("Connecting to datanode {}", dnAddr);
+      NetUtils.connect(sock, NetUtils.createSocketAddr(dnAddr), timeout);
+      sock.setSoTimeout(timeout);
+
+      OutputStream unbufOut = NetUtils.getOutputStream(sock);
+      InputStream unbufIn = NetUtils.getInputStream(sock);
+      IOStreamPair pair = saslClient.newSocketSend(sock, unbufOut,
+          unbufIn, dekFactory, blockToken, dn);
+
+      IOStreamPair result = new IOStreamPair(
+          new DataInputStream(pair.in),
+          new DataOutputStream(new BufferedOutputStream(pair.out,
+              DFSUtilClient.getSmallBufferSize(conf)))
+      );
+
+      success = true;
+      return result;
+    } finally {
+      if (!success) {
+        IOUtils.closeSocket(sock);
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/307ec80a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FileChecksumHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FileChecksumHelper.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FileChecksumHelper.java
new file mode 100644
index 0000000..d15db9f
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FileChecksumHelper.java
@@ -0,0 +1,416 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import org.apache.hadoop.fs.MD5MD5CRC32CastagnoliFileChecksum;
+import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
+import org.apache.hadoop.fs.MD5MD5CRC32GzipFileChecksum;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto;
+import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
+import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.MD5Hash;
+import org.apache.hadoop.util.DataChecksum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Utility classes to compute file checksum for both replicated and striped
+ * files.
+ */
+final class FileChecksumHelper {
+  static final Logger LOG =
+      LoggerFactory.getLogger(FileChecksumHelper.class);
+
+  private FileChecksumHelper() {}
+
+  /**
+   * A common abstract class to compute file checksum.
+   */
+  static abstract class FileChecksumComputer {
+    private final String src;
+    private final long length;
+    private final DFSClient client;
+    private final ClientProtocol namenode;
+    private final DataOutputBuffer md5out = new DataOutputBuffer();
+
+    private MD5MD5CRC32FileChecksum fileChecksum;
+    private LocatedBlocks blockLocations;
+
+    private int timeout;
+    private List<LocatedBlock> locatedBlocks;
+    private long remaining = 0L;
+
+    private int bytesPerCRC = -1;
+    private DataChecksum.Type crcType = DataChecksum.Type.DEFAULT;
+    private long crcPerBlock = 0;
+    private boolean refetchBlocks = false;
+    private int lastRetriedIndex = -1;
+
+    /**
+     * Constructor that accepts all the input parameters for the computing.
+     */
+    FileChecksumComputer(String src, long length,
+                         LocatedBlocks blockLocations,
+                         ClientProtocol namenode,
+                         DFSClient client) throws IOException {
+      this.src = src;
+      this.length = length;
+      this.blockLocations = blockLocations;
+      this.namenode = namenode;
+      this.client = client;
+
+      this.remaining = length;
+      if (src.contains(HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR_SEPARATOR)) {
+        this.remaining = Math.min(length, blockLocations.getFileLength());
+      }
+
+      this.locatedBlocks = blockLocations.getLocatedBlocks();
+    }
+
+    String getSrc() {
+      return src;
+    }
+
+    long getLength() {
+      return length;
+    }
+
+    DFSClient getClient() {
+      return client;
+    }
+
+    ClientProtocol getNamenode() {
+      return namenode;
+    }
+
+    DataOutputBuffer getMd5out() {
+      return md5out;
+    }
+
+    MD5MD5CRC32FileChecksum getFileChecksum() {
+      return fileChecksum;
+    }
+
+    LocatedBlocks getBlockLocations() {
+      return blockLocations;
+    }
+
+    void setBlockLocations(LocatedBlocks blockLocations) {
+      this.blockLocations = blockLocations;
+    }
+
+    int getTimeout() {
+      return timeout;
+    }
+
+    void setTimeout(int timeout) {
+      this.timeout = timeout;
+    }
+
+    List<LocatedBlock> getLocatedBlocks() {
+      return locatedBlocks;
+    }
+
+    void setLocatedBlocks(List<LocatedBlock> locatedBlocks) {
+      this.locatedBlocks = locatedBlocks;
+    }
+
+    long getRemaining() {
+      return remaining;
+    }
+
+    void setRemaining(long remaining) {
+      this.remaining = remaining;
+    }
+
+    int getBytesPerCRC() {
+      return bytesPerCRC;
+    }
+
+    void setBytesPerCRC(int bytesPerCRC) {
+      this.bytesPerCRC = bytesPerCRC;
+    }
+
+    DataChecksum.Type getCrcType() {
+      return crcType;
+    }
+
+    void setCrcType(DataChecksum.Type crcType) {
+      this.crcType = crcType;
+    }
+
+    long getCrcPerBlock() {
+      return crcPerBlock;
+    }
+
+    void setCrcPerBlock(long crcPerBlock) {
+      this.crcPerBlock = crcPerBlock;
+    }
+
+    boolean isRefetchBlocks() {
+      return refetchBlocks;
+    }
+
+    void setRefetchBlocks(boolean refetchBlocks) {
+      this.refetchBlocks = refetchBlocks;
+    }
+
+    int getLastRetriedIndex() {
+      return lastRetriedIndex;
+    }
+
+    void setLastRetriedIndex(int lastRetriedIndex) {
+      this.lastRetriedIndex = lastRetriedIndex;
+    }
+
+    /**
+     * Perform the file checksum computing. The intermediate results are stored
+     * in the object and will be used later.
+     * @throws IOException
+     */
+    void compute() throws IOException {
+      checksumBlocks();
+
+      fileChecksum = makeFinalResult();
+    }
+
+    /**
+     * Compute and aggregate block checksums block by block.
+     * @throws IOException
+     */
+    abstract void checksumBlocks() throws IOException;
+
+    /**
+     * Make final file checksum result given the computing process done.
+     */
+    MD5MD5CRC32FileChecksum makeFinalResult() {
+      //compute file MD5
+      final MD5Hash fileMD5 = MD5Hash.digest(md5out.getData());
+      switch (crcType) {
+      case CRC32:
+        return new MD5MD5CRC32GzipFileChecksum(bytesPerCRC,
+            crcPerBlock, fileMD5);
+      case CRC32C:
+        return new MD5MD5CRC32CastagnoliFileChecksum(bytesPerCRC,
+            crcPerBlock, fileMD5);
+      default:
+        // If there is no block allocated for the file,
+        // return one with the magic entry that matches what previous
+        // hdfs versions return.
+        if (locatedBlocks.isEmpty()) {
+          return new MD5MD5CRC32GzipFileChecksum(0, 0, fileMD5);
+        }
+
+        // we should never get here since the validity was checked
+        // when getCrcType() was called above.
+        return null;
+      }
+    }
+
+    /**
+     * Create and return a sender given an IO stream pair.
+     */
+    Sender createSender(IOStreamPair pair) {
+      DataOutputStream out = (DataOutputStream) pair.out;
+      return new Sender(out);
+    }
+
+    /**
+     * Close an IO stream pair.
+     */
+    void close(IOStreamPair pair) {
+      if (pair != null) {
+        IOUtils.closeStream(pair.in);
+        IOUtils.closeStream(pair.out);
+      }
+    }
+  }
+
+  /**
+   * Replicated file checksum computer.
+   */
+  static class ReplicatedFileChecksumComputer extends FileChecksumComputer {
+    private int blockIdx;
+
+    ReplicatedFileChecksumComputer(String src, long length,
+                                   LocatedBlocks blockLocations,
+                                   ClientProtocol namenode,
+                                   DFSClient client) throws IOException {
+      super(src, length, blockLocations, namenode, client);
+    }
+
+    @Override
+    void checksumBlocks() throws IOException {
+      // get block checksum for each block
+      for (blockIdx = 0;
+           blockIdx < getLocatedBlocks().size() && getRemaining() >= 0;
+           blockIdx++) {
+        if (isRefetchBlocks()) {  // refetch to get fresh tokens
+          setBlockLocations(getClient().getBlockLocations(getSrc(),
+              getLength()));
+          setLocatedBlocks(getBlockLocations().getLocatedBlocks());
+          setRefetchBlocks(false);
+        }
+
+        LocatedBlock locatedBlock = getLocatedBlocks().get(blockIdx);
+
+        if (!checksumBlock(locatedBlock)) {
+          throw new IOException("Fail to get block MD5 for " + locatedBlock);
+        }
+      }
+    }
+
+    /**
+     * Return true when sounds good to continue or retry, false when severe
+     * condition or totally failed.
+     */
+    private boolean checksumBlock(
+        LocatedBlock locatedBlock) throws IOException {
+      ExtendedBlock block = locatedBlock.getBlock();
+      if (getRemaining() < block.getNumBytes()) {
+        block.setNumBytes(getRemaining());
+      }
+      setRemaining(getRemaining() - block.getNumBytes());
+
+      DatanodeInfo[] datanodes = locatedBlock.getLocations();
+
+      int tmpTimeout = 3000 * datanodes.length +
+          getClient().getConf().getSocketTimeout();
+      setTimeout(tmpTimeout);
+
+      //try each datanode location of the block
+      boolean done = false;
+      for (int j = 0; !done && j < datanodes.length; j++) {
+        try {
+          tryDatanode(locatedBlock, datanodes[j]);
+          done = true;
+        } catch (InvalidBlockTokenException ibte) {
+          if (blockIdx > getLastRetriedIndex()) {
+            LOG.debug("Got access token error in response to OP_BLOCK_CHECKSUM "
+                    + "for file {} for block {} from datanode {}. Will retry "
+                    + "the block once.",
+                getSrc(), block, datanodes[j]);
+            setLastRetriedIndex(blockIdx);
+            done = true; // actually it's not done; but we'll retry
+            blockIdx--; // repeat at blockIdx-th block
+            setRefetchBlocks(true);
+          }
+        } catch (IOException ie) {
+          LOG.warn("src={}" + ", datanodes[{}]={}",
+              getSrc(), j, datanodes[j], ie);
+        }
+      }
+
+      return done;
+    }
+
+    /**
+     * Try one replica or datanode to compute the block checksum given a block.
+     */
+    private void tryDatanode(LocatedBlock locatedBlock,
+                             DatanodeInfo datanode) throws IOException {
+
+      ExtendedBlock block = locatedBlock.getBlock();
+
+      try (IOStreamPair pair = getClient().connectToDN(datanode, getTimeout(),
+          locatedBlock.getBlockToken())) {
+
+        LOG.debug("write to {}: {}, block={}", datanode,
+            Op.BLOCK_CHECKSUM, block);
+
+        // get block MD5
+        createSender(pair).blockChecksum(block,
+            locatedBlock.getBlockToken());
+
+        final BlockOpResponseProto reply = BlockOpResponseProto.parseFrom(
+            PBHelperClient.vintPrefixed(pair.in));
+
+        String logInfo = "for block " + block + " from datanode " +
+            datanode;
+        DataTransferProtoUtil.checkBlockOpStatus(reply, logInfo);
+
+        OpBlockChecksumResponseProto checksumData =
+            reply.getChecksumResponse();
+
+        //read byte-per-checksum
+        final int bpc = checksumData.getBytesPerCrc();
+        if (blockIdx == 0) { //first block
+          setBytesPerCRC(bpc);
+        } else if (bpc != getBytesPerCRC()) {
+          throw new IOException("Byte-per-checksum not matched: bpc=" + bpc
+              + " but bytesPerCRC=" + getBytesPerCRC());
+        }
+
+        //read crc-per-block
+        final long cpb = checksumData.getCrcPerBlock();
+        if (getLocatedBlocks().size() > 1 && blockIdx == 0) {
+          setCrcPerBlock(cpb);
+        }
+
+        //read md5
+        final MD5Hash md5 = new MD5Hash(
+            checksumData.getMd5().toByteArray());
+        md5.write(getMd5out());
+
+        // read crc-type
+        final DataChecksum.Type ct;
+        if (checksumData.hasCrcType()) {
+          ct = PBHelperClient.convert(checksumData
+              .getCrcType());
+        } else {
+          LOG.debug("Retrieving checksum from an earlier-version DataNode: " +
+              "inferring checksum by reading first byte");
+          ct = getClient().inferChecksumTypeByReading(locatedBlock, datanode);
+        }
+
+        if (blockIdx == 0) { // first block
+          setCrcType(ct);
+        } else if (getCrcType() != DataChecksum.Type.MIXED
+            && getCrcType() != ct) {
+          // if crc types are mixed in a file
+          setCrcType(DataChecksum.Type.MIXED);
+        }
+
+        if (LOG.isDebugEnabled()) {
+          if (blockIdx == 0) {
+            LOG.debug("set bytesPerCRC=" + getBytesPerCRC()
+                + ", crcPerBlock=" + getCrcPerBlock());
+          }
+          LOG.debug("got reply from " + datanode + ": md5=" + md5);
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/307ec80a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/IOStreamPair.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/IOStreamPair.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/IOStreamPair.java
index 4157a30..4ec73e3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/IOStreamPair.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/IOStreamPair.java
@@ -17,16 +17,19 @@
  */
 package org.apache.hadoop.hdfs.protocol.datatransfer;
 
+import java.io.Closeable;
+import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.io.IOUtils;
 
 /**
  * A little struct class to wrap an InputStream and an OutputStream.
  */
 @InterfaceAudience.Private
-public class IOStreamPair {
+public class IOStreamPair implements Closeable {
   public final InputStream in;
   public final OutputStream out;
 
@@ -34,4 +37,10 @@ public class IOStreamPair {
     this.in = in;
     this.out = out;
   }
+
+  @Override
+  public void close() throws IOException {
+    IOUtils.closeStream(in);
+    IOUtils.closeStream(out);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/307ec80a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index c3ea5ce..bb847a1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -233,6 +233,9 @@ Trunk (Unreleased)
 
     HDFS-9838. Refactor the excessReplicateMap to a class.  (szetszwo)
 
+    HDFS-9733. Refactor DFSClient#getFileChecksum and DataXceiver#blockChecksum
+    (Kai Zheng via umamahesh)
+
   OPTIMIZATIONS
 
   BUG FIXES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/307ec80a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockChecksumHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockChecksumHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockChecksumHelper.java
new file mode 100644
index 0000000..9a5552d
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockChecksumHelper.java
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdfs.DFSUtilClient;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.MD5Hash;
+import org.apache.hadoop.util.DataChecksum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.security.MessageDigest;
+
+/**
+ * Utilities for Block checksum computing, for both replicated and striped
+ * blocks.
+ */
+final class BlockChecksumHelper {
+
+  static final Logger LOG = LoggerFactory.getLogger(BlockChecksumHelper.class);
+
+  private BlockChecksumHelper() {}
+
+  /**
+   * The abstract base block checksum computer.
+   */
+  static abstract class BlockChecksumComputer {
+    private final DataNode datanode;
+    private final ExtendedBlock block;
+    // client side now can specify a range of the block for checksum
+    private final long requestLength;
+    private final LengthInputStream metadataIn;
+    private final DataInputStream checksumIn;
+    private final long visibleLength;
+    private final boolean partialBlk;
+
+    private byte[] outBytes;
+    private int bytesPerCRC = -1;
+    private DataChecksum.Type crcType = null;
+    private long crcPerBlock = -1;
+    private int checksumSize = -1;
+    private BlockMetadataHeader header;
+    private DataChecksum checksum;
+
+    BlockChecksumComputer(DataNode datanode,
+                          ExtendedBlock block) throws IOException {
+      this.datanode = datanode;
+      this.block = block;
+      this.requestLength = block.getNumBytes();
+      Preconditions.checkArgument(requestLength >= 0);
+
+      this.metadataIn = datanode.data.getMetaDataInputStream(block);
+      this.visibleLength = datanode.data.getReplicaVisibleLength(block);
+      this.partialBlk = requestLength < visibleLength;
+
+      int ioFileBufferSize =
+          DFSUtilClient.getIoFileBufferSize(datanode.getConf());
+      this.checksumIn = new DataInputStream(
+          new BufferedInputStream(metadataIn, ioFileBufferSize));
+    }
+
+    protected DataNode getDatanode() {
+      return datanode;
+    }
+
+    protected ExtendedBlock getBlock() {
+      return block;
+    }
+
+    protected long getRequestLength() {
+      return requestLength;
+    }
+
+    protected LengthInputStream getMetadataIn() {
+      return metadataIn;
+    }
+
+    protected DataInputStream getChecksumIn() {
+      return checksumIn;
+    }
+
+    protected long getVisibleLength() {
+      return visibleLength;
+    }
+
+    protected boolean isPartialBlk() {
+      return partialBlk;
+    }
+
+    protected void setOutBytes(byte[] bytes) {
+      this.outBytes = bytes;
+    }
+
+    protected byte[] getOutBytes() {
+      return outBytes;
+    }
+
+    protected int getBytesPerCRC() {
+      return bytesPerCRC;
+    }
+
+    protected DataChecksum.Type getCrcType() {
+      return crcType;
+    }
+
+    protected long getCrcPerBlock() {
+      return crcPerBlock;
+    }
+
+    protected int getChecksumSize() {
+      return checksumSize;
+    }
+
+    protected BlockMetadataHeader getHeader() {
+      return header;
+    }
+
+    protected DataChecksum getChecksum() {
+      return checksum;
+    }
+
+    /**
+     * Perform the block checksum computing.
+     * @throws IOException
+     */
+    abstract void compute() throws IOException;
+
+    /**
+     * Read block metadata header.
+     * @throws IOException
+     */
+    protected void readHeader() throws IOException {
+      //read metadata file
+      header = BlockMetadataHeader.readHeader(checksumIn);
+      checksum = header.getChecksum();
+      checksumSize = checksum.getChecksumSize();
+      bytesPerCRC = checksum.getBytesPerChecksum();
+      crcPerBlock = checksumSize <= 0 ? 0 :
+          (metadataIn.getLength() -
+              BlockMetadataHeader.getHeaderSize()) / checksumSize;
+      crcType = checksum.getChecksumType();
+    }
+
+    /**
+     * Calculate partial block checksum.
+     * @return
+     * @throws IOException
+     */
+    protected byte[] crcPartialBlock() throws IOException {
+      int partialLength = (int) (requestLength % bytesPerCRC);
+      if (partialLength > 0) {
+        byte[] buf = new byte[partialLength];
+        final InputStream blockIn = datanode.data.getBlockInputStream(block,
+            requestLength - partialLength);
+        try {
+          // Get the CRC of the partialLength.
+          IOUtils.readFully(blockIn, buf, 0, partialLength);
+        } finally {
+          IOUtils.closeStream(blockIn);
+        }
+        checksum.update(buf, 0, partialLength);
+        byte[] partialCrc = new byte[checksumSize];
+        checksum.writeValue(partialCrc, 0, true);
+        return partialCrc;
+      }
+
+      return null;
+    }
+  }
+
+  /**
+   * Replicated block checksum computer.
+   */
+  static class ReplicatedBlockChecksumComputer extends BlockChecksumComputer {
+
+    ReplicatedBlockChecksumComputer(DataNode datanode,
+                                    ExtendedBlock block) throws IOException {
+      super(datanode, block);
+    }
+
+    @Override
+    void compute() throws IOException {
+      try {
+        readHeader();
+
+        MD5Hash md5out;
+        if (isPartialBlk() && getCrcPerBlock() > 0) {
+          md5out = checksumPartialBlock();
+        } else {
+          md5out = checksumWholeBlock();
+        }
+        setOutBytes(md5out.getDigest());
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("block=" + getBlock() + ", bytesPerCRC=" + getBytesPerCRC()
+              + ", crcPerBlock=" + getCrcPerBlock() + ", md5out=" + md5out);
+        }
+      } finally {
+        IOUtils.closeStream(getChecksumIn());
+        IOUtils.closeStream(getMetadataIn());
+      }
+    }
+
+    private MD5Hash checksumWholeBlock() throws IOException {
+      MD5Hash md5out = MD5Hash.digest(getChecksumIn());
+      return md5out;
+    }
+
+    private MD5Hash checksumPartialBlock() throws IOException {
+      byte[] buffer = new byte[4*1024];
+      MessageDigest digester = MD5Hash.getDigester();
+
+      long remaining = (getRequestLength() / getBytesPerCRC())
+          * getChecksumSize();
+      for (int toDigest = 0; remaining > 0; remaining -= toDigest) {
+        toDigest = getChecksumIn().read(buffer, 0,
+            (int) Math.min(remaining, buffer.length));
+        if (toDigest < 0) {
+          break;
+        }
+        digester.update(buffer, 0, toDigest);
+      }
+
+      byte[] partialCrc = crcPartialBlock();
+      if (partialCrc != null) {
+        digester.update(partialCrc);
+      }
+
+      return new MD5Hash(digester.digest());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/307ec80a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
index ee079cb..773a64c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
@@ -47,7 +47,6 @@ import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest;
 import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.net.SocketOutputStream;
 import org.apache.hadoop.util.DataChecksum;
-import org.apache.htrace.core.Sampler;
 import org.apache.htrace.core.TraceScope;
 
 import static org.apache.hadoop.io.nativeio.NativeIO.POSIX.POSIX_FADV_DONTNEED;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/307ec80a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 470d3ca..3e2a25d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -3170,6 +3170,16 @@ public class DataNode extends ReconfigurableBase
     return ecWorker;
   }
 
+  IOStreamPair connectToDN(DatanodeInfo datanodeID, int timeout,
+                           ExtendedBlock block,
+                           Token<BlockTokenIdentifier> blockToken)
+      throws IOException {
+
+    return DFSUtilClient.connectToDN(datanodeID, timeout, conf, saslClient,
+        NetUtils.getDefaultSocketFactory(getConf()), false,
+        getDataEncryptionKeyFactoryForBlock(block), blockToken);
+  }
+
   /**
    * Get timeout value of each OOB type from configuration
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/307ec80a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
index 0041cd8..1d4a79a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
@@ -17,37 +17,9 @@
  */
 package org.apache.hadoop.hdfs.server.datanode;
 
-import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR;
-import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR_ACCESS_TOKEN;
-import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR_INVALID;
-import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR_UNSUPPORTED;
-import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.SUCCESS;
-import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitFdResponse.USE_RECEIPT_VERIFICATION;
-import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitFdResponse.DO_NOT_USE_RECEIPT_VERIFICATION;
-import static org.apache.hadoop.hdfs.server.datanode.DataNode.DN_CLIENTTRACE_FORMAT;
-import static org.apache.hadoop.util.Time.monotonicNow;
-
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.EOFException;
-import java.io.FileDescriptor;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InterruptedIOException;
-import java.io.OutputStream;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.net.SocketException;
-import java.net.SocketTimeoutException;
-import java.nio.channels.ClosedChannelException;
-import java.security.MessageDigest;
-import java.util.Arrays;
-import java.util.concurrent.TimeUnit;
-
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ByteString;
 import org.apache.commons.logging.Log;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.DFSUtilClient;
@@ -73,26 +45,52 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmR
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.datanode.BlockChecksumHelper.BlockChecksumComputer;
+import org.apache.hadoop.hdfs.server.datanode.BlockChecksumHelper.ReplicatedBlockChecksumComputer;
 import org.apache.hadoop.hdfs.server.datanode.DataNode.ShortCircuitFdsUnsupportedException;
 import org.apache.hadoop.hdfs.server.datanode.DataNode.ShortCircuitFdsVersionException;
 import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry.NewShmInfo;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId;
 import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.MD5Hash;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.unix.DomainSocket;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.StopWatch;
-
-import com.google.common.base.Preconditions;
-import com.google.protobuf.ByteString;
 import org.apache.hadoop.util.Time;
 import org.slf4j.Logger;
 
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.FileDescriptor;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InterruptedIOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
+import java.nio.channels.ClosedChannelException;
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitFdResponse.DO_NOT_USE_RECEIPT_VERIFICATION;
+import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitFdResponse.USE_RECEIPT_VERIFICATION;
+import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR;
+import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR_ACCESS_TOKEN;
+import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR_INVALID;
+import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR_UNSUPPORTED;
+import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.SUCCESS;
+import static org.apache.hadoop.hdfs.server.datanode.DataNode.DN_CLIENTTRACE_FORMAT;
+import static org.apache.hadoop.util.Time.monotonicNow;
+
 
 /**
  * Thread for processing incoming/outgoing data stream.
@@ -886,90 +884,32 @@ class DataXceiver extends Receiver implements Runnable {
     }
   }
 
-  private MD5Hash calcPartialBlockChecksum(ExtendedBlock block,
-      long requestLength, DataChecksum checksum, DataInputStream checksumIn)
-      throws IOException {
-    final int bytesPerCRC = checksum.getBytesPerChecksum();
-    final int csize = checksum.getChecksumSize();
-    final byte[] buffer = new byte[4*1024];
-    MessageDigest digester = MD5Hash.getDigester();
-
-    long remaining = requestLength / bytesPerCRC * csize;
-    for (int toDigest = 0; remaining > 0; remaining -= toDigest) {
-      toDigest = checksumIn.read(buffer, 0,
-          (int) Math.min(remaining, buffer.length));
-      if (toDigest < 0) {
-        break;
-      }
-      digester.update(buffer, 0, toDigest);
-    }
-    
-    int partialLength = (int) (requestLength % bytesPerCRC);
-    if (partialLength > 0) {
-      byte[] buf = new byte[partialLength];
-      final InputStream blockIn = datanode.data.getBlockInputStream(block,
-          requestLength - partialLength);
-      try {
-        // Get the CRC of the partialLength.
-        IOUtils.readFully(blockIn, buf, 0, partialLength);
-      } finally {
-        IOUtils.closeStream(blockIn);
-      }
-      checksum.update(buf, 0, partialLength);
-      byte[] partialCrc = new byte[csize];
-      checksum.writeValue(partialCrc, 0, true);
-      digester.update(partialCrc);
-    }
-    return new MD5Hash(digester.digest());
-  }
-
   @Override
-  public void blockChecksum(final ExtendedBlock block,
-      final Token<BlockTokenIdentifier> blockToken) throws IOException {
+  public void blockChecksum(ExtendedBlock block,
+                            Token<BlockTokenIdentifier> blockToken)
+      throws IOException {
     updateCurrentThreadName("Getting checksum for block " + block);
     final DataOutputStream out = new DataOutputStream(
         getOutputStream());
     checkAccess(out, true, block, blockToken,
         Op.BLOCK_CHECKSUM, BlockTokenIdentifier.AccessMode.READ);
-    // client side now can specify a range of the block for checksum
-    long requestLength = block.getNumBytes();
-    Preconditions.checkArgument(requestLength >= 0);
-    long visibleLength = datanode.data.getReplicaVisibleLength(block);
-    boolean partialBlk = requestLength < visibleLength;
-
-    final LengthInputStream metadataIn = datanode.data
-        .getMetaDataInputStream(block);
-    
-    final DataInputStream checksumIn = new DataInputStream(
-        new BufferedInputStream(metadataIn, ioFileBufferSize));
+
+    BlockChecksumComputer maker =
+        new ReplicatedBlockChecksumComputer(datanode, block);
+
     try {
-      //read metadata file
-      final BlockMetadataHeader header = BlockMetadataHeader
-          .readHeader(checksumIn);
-      final DataChecksum checksum = header.getChecksum();
-      final int csize = checksum.getChecksumSize();
-      final int bytesPerCRC = checksum.getBytesPerChecksum();
-      final long crcPerBlock = csize <= 0 ? 0 : 
-        (metadataIn.getLength() - BlockMetadataHeader.getHeaderSize()) / csize;
-
-      final MD5Hash md5 = partialBlk && crcPerBlock > 0 ? 
-          calcPartialBlockChecksum(block, requestLength, checksum, checksumIn)
-            : MD5Hash.digest(checksumIn);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("block=" + block + ", bytesPerCRC=" + bytesPerCRC
-            + ", crcPerBlock=" + crcPerBlock + ", md5=" + md5);
-      }
+      maker.compute();
 
       //write reply
       BlockOpResponseProto.newBuilder()
-        .setStatus(SUCCESS)
-        .setChecksumResponse(OpBlockChecksumResponseProto.newBuilder()             
-          .setBytesPerCrc(bytesPerCRC)
-          .setCrcPerBlock(crcPerBlock)
-          .setMd5(ByteString.copyFrom(md5.getDigest()))
-          .setCrcType(PBHelperClient.convert(checksum.getChecksumType())))
-        .build()
-        .writeDelimitedTo(out);
+          .setStatus(SUCCESS)
+          .setChecksumResponse(OpBlockChecksumResponseProto.newBuilder()
+              .setBytesPerCrc(maker.getBytesPerCRC())
+              .setCrcPerBlock(maker.getCrcPerBlock())
+              .setMd5(ByteString.copyFrom(maker.getOutBytes()))
+              .setCrcType(PBHelperClient.convert(maker.getCrcType())))
+          .build()
+          .writeDelimitedTo(out);
       out.flush();
     } catch (IOException ioe) {
       LOG.info("blockChecksum " + block + " received exception " + ioe);
@@ -977,8 +917,6 @@ class DataXceiver extends Receiver implements Runnable {
       throw ioe;
     } finally {
       IOUtils.closeStream(out);
-      IOUtils.closeStream(checksumIn);
-      IOUtils.closeStream(metadataIn);
     }
 
     //update metrics
@@ -1276,7 +1214,7 @@ class DataXceiver extends Receiver implements Runnable {
   /**
    * Wait until the BP is registered, upto the configured amount of time.
    * Throws an exception if times out, which should fail the client request.
-   * @param the requested block
+   * @param block requested block
    */
   void checkAndWaitForBP(final ExtendedBlock block)
       throws IOException {