You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2016/03/28 19:36:03 UTC

[35/48] hadoop git commit: HDFS-9694. Make existing DFSClient#getFileChecksum() work for striped blocks. Contributed by Kai Zheng

HDFS-9694. Make existing DFSClient#getFileChecksum() work for striped blocks. Contributed by Kai Zheng


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e5ff0ea7
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e5ff0ea7
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e5ff0ea7

Branch: refs/heads/HDFS-7240
Commit: e5ff0ea7ba087984262f1f27200ae5bb40d9b838
Parents: fde8ac5
Author: Uma Maheswara Rao G <um...@intel.com>
Authored: Sat Mar 26 00:52:50 2016 -0700
Committer: Uma Maheswara Rao G <um...@intel.com>
Committed: Sat Mar 26 00:52:50 2016 -0700

----------------------------------------------------------------------
 .../dev-support/findbugsExcludeFile.xml         |   1 +
 .../java/org/apache/hadoop/hdfs/DFSClient.java  |  11 +-
 .../apache/hadoop/hdfs/FileChecksumHelper.java  | 187 ++++++++++--
 .../datatransfer/DataTransferProtocol.java      |  16 +-
 .../hadoop/hdfs/protocol/datatransfer/Op.java   |   1 +
 .../hdfs/protocol/datatransfer/Sender.java      |  19 ++
 .../hadoop/hdfs/protocolPB/PBHelperClient.java  |  42 ++-
 .../hadoop/hdfs/util/StripedBlockUtil.java      |  12 +
 .../src/main/proto/datatransfer.proto           |   9 +-
 .../hdfs/protocol/datatransfer/Receiver.java    |  28 ++
 .../server/datanode/BlockChecksumHelper.java    | 284 +++++++++++++++----
 .../hdfs/server/datanode/DataXceiver.java       |  43 +++
 12 files changed, 570 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/e5ff0ea7/hadoop-hdfs-project/hadoop-hdfs-client/dev-support/findbugsExcludeFile.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/dev-support/findbugsExcludeFile.xml b/hadoop-hdfs-project/hadoop-hdfs-client/dev-support/findbugsExcludeFile.xml
index 2c3329e..9d6ab9a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/dev-support/findbugsExcludeFile.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/dev-support/findbugsExcludeFile.xml
@@ -8,6 +8,7 @@
       <Class name="org.apache.hadoop.hdfs.protocol.LocatedBlock"/>
       <Class name="org.apache.hadoop.hdfs.protocol.BlockStoragePolicy"/>
       <Class name="org.apache.hadoop.hdfs.protocol.CorruptFileBlocks"/>
+      <Class name="org.apache.hadoop.hdfs.protocol.StripedBlockInfo"/>
       <Class name="org.apache.hadoop.hdfs.protocol.DirectoryListing"/>
       <Class name="org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier"/>
       <Class name="org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey"/>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e5ff0ea7/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index 3506d3a..88bd219 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -1704,7 +1704,10 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
 
   /**
    * Get the checksum of the whole file or a range of the file. Note that the
-   * range always starts from the beginning of the file.
+   * range always starts from the beginning of the file. The file can be
+   * in replicated form, or striped mode. It can be used to checksum and compare
+   * two replicated files, or two striped files, but not applicable for two
+   * files of different block layout forms.
    * @param src The file path
    * @param length the length of the range, i.e., the range is [0, length]
    * @return The checksum
@@ -1717,7 +1720,11 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
 
     LocatedBlocks blockLocations = getBlockLocations(src, length);
 
-    FileChecksumHelper.FileChecksumComputer maker =
+    FileChecksumHelper.FileChecksumComputer maker;
+    ErasureCodingPolicy ecPolicy = blockLocations.getErasureCodingPolicy();
+    maker = ecPolicy != null ?
+        new FileChecksumHelper.StripedFileNonStripedChecksumComputer(src,
+            length, blockLocations, namenode, this, ecPolicy) :
         new FileChecksumHelper.ReplicatedFileChecksumComputer(src, length,
             blockLocations, namenode, this);
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e5ff0ea7/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FileChecksumHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FileChecksumHelper.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FileChecksumHelper.java
index d15db9f..dfd9393 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FileChecksumHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FileChecksumHelper.java
@@ -22,10 +22,13 @@ import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
 import org.apache.hadoop.fs.MD5MD5CRC32GzipFileChecksum;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
+import org.apache.hadoop.hdfs.protocol.StripedBlockInfo;
 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
 import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
@@ -75,7 +78,7 @@ final class FileChecksumHelper {
     private int bytesPerCRC = -1;
     private DataChecksum.Type crcType = DataChecksum.Type.DEFAULT;
     private long crcPerBlock = 0;
-    private boolean refetchBlocks = false;
+    private boolean isRefetchBlocks = false;
     private int lastRetriedIndex = -1;
 
     /**
@@ -127,8 +130,11 @@ final class FileChecksumHelper {
       return blockLocations;
     }
 
-    void setBlockLocations(LocatedBlocks blockLocations) {
-      this.blockLocations = blockLocations;
+    void refetchBlocks() throws IOException {
+      this.blockLocations = getClient().getBlockLocations(getSrc(),
+          getLength());
+      this.locatedBlocks = getBlockLocations().getLocatedBlocks();
+      this.isRefetchBlocks = false;
     }
 
     int getTimeout() {
@@ -143,10 +149,6 @@ final class FileChecksumHelper {
       return locatedBlocks;
     }
 
-    void setLocatedBlocks(List<LocatedBlock> locatedBlocks) {
-      this.locatedBlocks = locatedBlocks;
-    }
-
     long getRemaining() {
       return remaining;
     }
@@ -180,11 +182,11 @@ final class FileChecksumHelper {
     }
 
     boolean isRefetchBlocks() {
-      return refetchBlocks;
+      return isRefetchBlocks;
     }
 
     void setRefetchBlocks(boolean refetchBlocks) {
-      this.refetchBlocks = refetchBlocks;
+      this.isRefetchBlocks = refetchBlocks;
     }
 
     int getLastRetriedIndex() {
@@ -278,10 +280,7 @@ final class FileChecksumHelper {
            blockIdx < getLocatedBlocks().size() && getRemaining() >= 0;
            blockIdx++) {
         if (isRefetchBlocks()) {  // refetch to get fresh tokens
-          setBlockLocations(getClient().getBlockLocations(getSrc(),
-              getLength()));
-          setLocatedBlocks(getBlockLocations().getLocatedBlocks());
-          setRefetchBlocks(false);
+          refetchBlocks();
         }
 
         LocatedBlock locatedBlock = getLocatedBlocks().get(blockIdx);
@@ -380,15 +379,13 @@ final class FileChecksumHelper {
         }
 
         //read md5
-        final MD5Hash md5 = new MD5Hash(
-            checksumData.getMd5().toByteArray());
+        final MD5Hash md5 = new MD5Hash(checksumData.getMd5().toByteArray());
         md5.write(getMd5out());
 
         // read crc-type
         final DataChecksum.Type ct;
         if (checksumData.hasCrcType()) {
-          ct = PBHelperClient.convert(checksumData
-              .getCrcType());
+          ct = PBHelperClient.convert(checksumData.getCrcType());
         } else {
           LOG.debug("Retrieving checksum from an earlier-version DataNode: " +
               "inferring checksum by reading first byte");
@@ -413,4 +410,160 @@ final class FileChecksumHelper {
       }
     }
   }
+
+  /**
+   * Striped file checksum computing.
+   */
+  static class StripedFileNonStripedChecksumComputer
+      extends FileChecksumComputer {
+    private final ErasureCodingPolicy ecPolicy;
+    private int bgIdx;
+
+    StripedFileNonStripedChecksumComputer(String src, long length,
+                                          LocatedBlocks blockLocations,
+                                          ClientProtocol namenode,
+                                          DFSClient client,
+                                          ErasureCodingPolicy ecPolicy)
+        throws IOException {
+      super(src, length, blockLocations, namenode, client);
+
+      this.ecPolicy = ecPolicy;
+    }
+
+    @Override
+    void checksumBlocks() throws IOException {
+      int tmpTimeout = 3000 * 1 + getClient().getConf().getSocketTimeout();
+      setTimeout(tmpTimeout);
+
+      for (bgIdx = 0;
+           bgIdx < getLocatedBlocks().size() && getRemaining() >= 0; bgIdx++) {
+        if (isRefetchBlocks()) {  // refetch to get fresh tokens
+          refetchBlocks();
+        }
+
+        LocatedBlock locatedBlock = getLocatedBlocks().get(bgIdx);
+        LocatedStripedBlock blockGroup = (LocatedStripedBlock) locatedBlock;
+
+        if (!checksumBlockGroup(blockGroup)) {
+          throw new IOException("Fail to get block MD5 for " + locatedBlock);
+        }
+      }
+    }
+
+
+    private boolean checksumBlockGroup(
+        LocatedStripedBlock blockGroup) throws IOException {
+      ExtendedBlock block = blockGroup.getBlock();
+      if (getRemaining() < block.getNumBytes()) {
+        block.setNumBytes(getRemaining());
+      }
+      setRemaining(getRemaining() - block.getNumBytes());
+
+      StripedBlockInfo stripedBlockInfo = new StripedBlockInfo(block,
+          blockGroup.getLocations(), blockGroup.getBlockTokens(), ecPolicy);
+      DatanodeInfo[] datanodes = blockGroup.getLocations();
+
+      //try each datanode in the block group.
+      boolean done = false;
+      for (int j = 0; !done && j < datanodes.length; j++) {
+        try {
+          tryDatanode(blockGroup, stripedBlockInfo, datanodes[j]);
+          done = true;
+        } catch (InvalidBlockTokenException ibte) {
+          if (bgIdx > getLastRetriedIndex()) {
+            LOG.debug("Got access token error in response to OP_BLOCK_CHECKSUM "
+                    + "for file {} for block {} from datanode {}. Will retry "
+                    + "the block once.",
+                getSrc(), block, datanodes[j]);
+            setLastRetriedIndex(bgIdx);
+            done = true; // actually it's not done; but we'll retry
+            bgIdx--; // repeat at bgIdx-th block
+            setRefetchBlocks(true);
+          }
+        } catch (IOException ie) {
+          LOG.warn("src={}" + ", datanodes[{}]={}",
+              getSrc(), j, datanodes[j], ie);
+        }
+      }
+
+      return done;
+    }
+
+    /**
+     * Return true when sounds good to continue or retry, false when severe
+     * condition or totally failed.
+     */
+    private void tryDatanode(LocatedStripedBlock blockGroup,
+                             StripedBlockInfo stripedBlockInfo,
+                             DatanodeInfo datanode) throws IOException {
+
+      try (IOStreamPair pair = getClient().connectToDN(datanode,
+          getTimeout(), blockGroup.getBlockToken())) {
+
+        LOG.debug("write to {}: {}, blockGroup={}",
+            datanode, Op.BLOCK_GROUP_CHECKSUM, blockGroup);
+
+        // get block MD5
+        createSender(pair).blockGroupChecksum(stripedBlockInfo,
+            blockGroup.getBlockToken());
+
+        BlockOpResponseProto reply = BlockOpResponseProto.parseFrom(
+            PBHelperClient.vintPrefixed(pair.in));
+
+        String logInfo = "for blockGroup " + blockGroup +
+            " from datanode " + datanode;
+        DataTransferProtoUtil.checkBlockOpStatus(reply, logInfo);
+
+        OpBlockChecksumResponseProto checksumData = reply.getChecksumResponse();
+
+        //read byte-per-checksum
+        final int bpc = checksumData.getBytesPerCrc();
+        if (bgIdx == 0) { //first block
+          setBytesPerCRC(bpc);
+        } else {
+          if (bpc != getBytesPerCRC()) {
+            throw new IOException("Byte-per-checksum not matched: bpc=" + bpc
+                + " but bytesPerCRC=" + getBytesPerCRC());
+          }
+        }
+
+        //read crc-per-block
+        final long cpb = checksumData.getCrcPerBlock();
+        if (getLocatedBlocks().size() > 1 && bgIdx == 0) { // first block
+          setCrcPerBlock(cpb);
+        }
+
+        //read md5
+        final MD5Hash md5 = new MD5Hash(
+            checksumData.getMd5().toByteArray());
+        md5.write(getMd5out());
+
+        // read crc-type
+        final DataChecksum.Type ct;
+        if (checksumData.hasCrcType()) {
+          ct = PBHelperClient.convert(checksumData.getCrcType());
+        } else {
+          LOG.debug("Retrieving checksum from an earlier-version DataNode: " +
+              "inferring checksum by reading first byte");
+          ct = getClient().inferChecksumTypeByReading(blockGroup, datanode);
+        }
+
+        if (bgIdx == 0) {
+          setCrcType(ct);
+        } else if (getCrcType() != DataChecksum.Type.MIXED &&
+            getCrcType() != ct) {
+          // if crc types are mixed in a file
+          setCrcType(DataChecksum.Type.MIXED);
+        }
+
+        if (LOG.isDebugEnabled()) {
+          if (bgIdx == 0) {
+            LOG.debug("set bytesPerCRC=" + getBytesPerCRC()
+                + ", crcPerBlock=" + getCrcPerBlock());
+          }
+          LOG.debug("got reply from " + datanode + ": md5=" + md5);
+        }
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e5ff0ea7/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
index 4aa545b..ad3f2ad 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.StripedBlockInfo;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId;
@@ -197,6 +198,17 @@ public interface DataTransferProtocol {
    * @param blockToken security token for accessing the block.
    * @throws IOException
    */
-  void blockChecksum(final ExtendedBlock blk,
-      final Token<BlockTokenIdentifier> blockToken) throws IOException;
+  void blockChecksum(ExtendedBlock blk,
+      Token<BlockTokenIdentifier> blockToken) throws IOException;
+
+
+  /**
+   * Get striped block group checksum (MD5 of CRC32).
+   *
+   * @param stripedBlockInfo a striped block info.
+   * @param blockToken security token for accessing the block.
+   * @throws IOException
+   */
+  void blockGroupChecksum(StripedBlockInfo stripedBlockInfo,
+          Token<BlockTokenIdentifier> blockToken) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e5ff0ea7/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java
index 511574c..94250e5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java
@@ -38,6 +38,7 @@ public enum Op {
   REQUEST_SHORT_CIRCUIT_FDS((byte)87),
   RELEASE_SHORT_CIRCUIT_FDS((byte)88),
   REQUEST_SHORT_CIRCUIT_SHM((byte)89),
+  BLOCK_GROUP_CHECKSUM((byte)90),
   CUSTOM((byte)127);
 
   /** The code for this operation. */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e5ff0ea7/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
index 6545681..585ed99 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
@@ -28,11 +28,13 @@ import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.StripedBlockInfo;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferTraceInfoProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockGroupChecksumProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto;
@@ -261,4 +263,21 @@ public class Sender implements DataTransferProtocol {
 
     send(out, Op.BLOCK_CHECKSUM, proto);
   }
+
+  @Override
+  public void blockGroupChecksum(StripedBlockInfo stripedBlockInfo,
+         Token<BlockTokenIdentifier> blockToken) throws IOException {
+    OpBlockGroupChecksumProto proto = OpBlockGroupChecksumProto.newBuilder()
+        .setHeader(DataTransferProtoUtil.buildBaseHeader(
+            stripedBlockInfo.getBlock(), blockToken))
+        .setDatanodes(PBHelperClient.convertToProto(
+            stripedBlockInfo.getDatanodes()))
+        .addAllBlockTokens(PBHelperClient.convert(
+            stripedBlockInfo.getBlockTokens()))
+        .setEcPolicy(PBHelperClient.convertErasureCodingPolicy(
+            stripedBlockInfo.getErasureCodingPolicy()))
+        .build();
+
+    send(out, Op.BLOCK_GROUP_CHECKSUM, proto);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e5ff0ea7/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
index 38e875c..4759373 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
@@ -553,10 +553,8 @@ public class PBHelperClient {
           proto.getCorrupt(),
           cachedLocs.toArray(new DatanodeInfo[cachedLocs.size()]));
       List<TokenProto> tokenProtos = proto.getBlockTokensList();
-      Token<BlockTokenIdentifier>[] blockTokens = new Token[indices.length];
-      for (int i = 0; i < indices.length; i++) {
-        blockTokens[i] = convert(tokenProtos.get(i));
-      }
+      Token<BlockTokenIdentifier>[] blockTokens =
+          convertTokens(tokenProtos);
       ((LocatedStripedBlock) lb).setBlockTokens(blockTokens);
     }
     lb.setBlockToken(convert(proto.getBlockToken()));
@@ -564,6 +562,18 @@ public class PBHelperClient {
     return lb;
   }
 
+  static public Token<BlockTokenIdentifier>[] convertTokens(
+      List<TokenProto> tokenProtos) {
+
+    @SuppressWarnings("unchecked")
+    Token<BlockTokenIdentifier>[] blockTokens = new Token[tokenProtos.size()];
+    for (int i = 0; i < blockTokens.length; i++) {
+      blockTokens[i] = convert(tokenProtos.get(i));
+    }
+
+    return blockTokens;
+  }
+
   static public DatanodeInfo convert(DatanodeInfoProto di) {
     if (di == null) return null;
     return new DatanodeInfo(
@@ -815,9 +825,7 @@ public class PBHelperClient {
       byte[] indices = sb.getBlockIndices();
       builder.setBlockIndices(PBHelperClient.getByteString(indices));
       Token<BlockTokenIdentifier>[] blockTokens = sb.getBlockTokens();
-      for (int i = 0; i < indices.length; i++) {
-        builder.addBlockTokens(PBHelperClient.convert(blockTokens[i]));
-      }
+      builder.addAllBlockTokens(convert(blockTokens));
     }
 
     return builder.setB(PBHelperClient.convert(b.getBlock()))
@@ -825,6 +833,16 @@ public class PBHelperClient {
         .setCorrupt(b.isCorrupt()).setOffset(b.getStartOffset()).build();
   }
 
+  public static List<TokenProto> convert(
+      Token<BlockTokenIdentifier>[] blockTokens) {
+    List<TokenProto> results = new ArrayList<>(blockTokens.length);
+    for (Token<BlockTokenIdentifier> bt : blockTokens) {
+      results.add(convert(bt));
+    }
+
+    return results;
+  }
+
   public static BlockStoragePolicy convert(BlockStoragePolicyProto proto) {
     List<StorageTypeProto> cList = proto.getCreationPolicy()
         .getStorageTypesList();
@@ -2500,4 +2518,14 @@ public class PBHelperClient {
         .setId(policy.getId());
     return builder.build();
   }
+
+  public static HdfsProtos.DatanodeInfosProto convertToProto(
+      DatanodeInfo[] datanodeInfos) {
+    HdfsProtos.DatanodeInfosProto.Builder builder =
+        HdfsProtos.DatanodeInfosProto.newBuilder();
+    for (DatanodeInfo datanodeInfo : datanodeInfos) {
+      builder.addDatanodes(PBHelperClient.convert(datanodeInfo));
+    }
+    return builder.build();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e5ff0ea7/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
index 43772e2..0819376 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
@@ -76,6 +76,18 @@ public class StripedBlockUtil {
   public static final Logger LOG = LoggerFactory.getLogger(StripedBlockUtil.class);
 
   /**
+   * Parses a striped block group into individual blocks.
+   * @param bg The striped block group
+   * @param ecPolicy The erasure coding policy
+   * @return An array of the blocks in the group
+   */
+  public static LocatedBlock[] parseStripedBlockGroup(LocatedStripedBlock bg,
+                                               ErasureCodingPolicy ecPolicy) {
+    return parseStripedBlockGroup(bg, ecPolicy.getCellSize(),
+        ecPolicy.getNumDataUnits(), ecPolicy.getNumParityUnits());
+  }
+
+  /**
    * This method parses a striped block group into individual blocks.
    *
    * @param bg The striped block group

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e5ff0ea7/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto
index a091d41..522ee06 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto
@@ -74,7 +74,6 @@ message OpReadBlockProto {
   optional CachingStrategyProto cachingStrategy = 5;
 }
 
-
 message ChecksumProto {
   required ChecksumTypeProto type = 1;
   required uint32 bytesPerChecksum = 2;
@@ -149,6 +148,14 @@ message OpBlockChecksumProto {
   required BaseHeaderProto header = 1;
 }
 
+message OpBlockGroupChecksumProto {
+  required BaseHeaderProto header = 1;
+  required DatanodeInfosProto datanodes = 2;
+  // each internal block has a block token
+  repeated hadoop.common.TokenProto blockTokens = 3;
+  required ErasureCodingPolicyProto ecPolicy = 4;
+}
+
 /**
  * An ID uniquely identifying a shared memory segment.
  */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e5ff0ea7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
index e040157..b2f26f8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
@@ -26,11 +26,13 @@ import java.io.IOException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.StripedBlockInfo;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BaseHeaderProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferTraceInfoProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockGroupChecksumProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto;
@@ -111,6 +113,9 @@ public abstract class Receiver implements DataTransferProtocol {
     case BLOCK_CHECKSUM:
       opBlockChecksum(in);
       break;
+    case BLOCK_GROUP_CHECKSUM:
+      opStripedBlockChecksum(in);
+      break;
     case TRANSFER_BLOCK:
       opTransferBlock(in);
       break;
@@ -290,4 +295,27 @@ public abstract class Receiver implements DataTransferProtocol {
       if (traceScope != null) traceScope.close();
     }
   }
+
+  /** Receive OP_STRIPED_BLOCK_CHECKSUM. */
+  private void opStripedBlockChecksum(DataInputStream dis) throws IOException {
+    OpBlockGroupChecksumProto proto =
+        OpBlockGroupChecksumProto.parseFrom(vintPrefixed(dis));
+    TraceScope traceScope = continueTraceSpan(proto.getHeader(),
+        proto.getClass().getSimpleName());
+    StripedBlockInfo stripedBlockInfo = new StripedBlockInfo(
+        PBHelperClient.convert(proto.getHeader().getBlock()),
+        PBHelperClient.convert(proto.getDatanodes()),
+        PBHelperClient.convertTokens(proto.getBlockTokensList()),
+        PBHelperClient.convertErasureCodingPolicy(proto.getEcPolicy())
+    );
+
+    try {
+      blockGroupChecksum(stripedBlockInfo,
+          PBHelperClient.convert(proto.getHeader().getToken()));
+    } finally {
+      if (traceScope != null) {
+        traceScope.close();
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e5ff0ea7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockChecksumHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockChecksumHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockChecksumHelper.java
index 9a5552d..1f1a25c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockChecksumHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockChecksumHelper.java
@@ -19,16 +19,30 @@ package org.apache.hadoop.hdfs.server.datanode;
 
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdfs.DFSUtilClient;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.StripedBlockInfo;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos;
+import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
+import org.apache.hadoop.hdfs.util.StripedBlockUtil;
+import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.MD5Hash;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.DataChecksum;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedInputStream;
 import java.io.DataInputStream;
+import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.security.MessageDigest;
@@ -41,13 +55,87 @@ final class BlockChecksumHelper {
 
   static final Logger LOG = LoggerFactory.getLogger(BlockChecksumHelper.class);
 
-  private BlockChecksumHelper() {}
+  private BlockChecksumHelper() {
+  }
 
   /**
    * The abstract base block checksum computer.
    */
-  static abstract class BlockChecksumComputer {
+  static abstract class AbstractBlockChecksumComputer {
     private final DataNode datanode;
+
+    private byte[] outBytes;
+    private int bytesPerCRC = -1;
+    private DataChecksum.Type crcType = null;
+    private long crcPerBlock = -1;
+    private int checksumSize = -1;
+
+    AbstractBlockChecksumComputer(DataNode datanode) throws IOException {
+      this.datanode = datanode;
+    }
+
+    abstract void compute() throws IOException;
+
+    Sender createSender(IOStreamPair pair) {
+      DataOutputStream out = (DataOutputStream) pair.out;
+      return new Sender(out);
+    }
+
+    DataNode getDatanode() {
+      return datanode;
+    }
+
+    InputStream getBlockInputStream(ExtendedBlock block, long seekOffset)
+        throws IOException {
+      return datanode.data.getBlockInputStream(block, seekOffset);
+    }
+
+    void setOutBytes(byte[] bytes) {
+      this.outBytes = bytes;
+    }
+
+    byte[] getOutBytes() {
+      return outBytes;
+    }
+
+    int getBytesPerCRC() {
+      return bytesPerCRC;
+    }
+
+    public void setBytesPerCRC(int bytesPerCRC) {
+      this.bytesPerCRC = bytesPerCRC;
+    }
+
+    public void setCrcType(DataChecksum.Type crcType) {
+      this.crcType = crcType;
+    }
+
+    public void setCrcPerBlock(long crcPerBlock) {
+      this.crcPerBlock = crcPerBlock;
+    }
+
+    public void setChecksumSize(int checksumSize) {
+      this.checksumSize = checksumSize;
+    }
+
+    DataChecksum.Type getCrcType() {
+      return crcType;
+    }
+
+    long getCrcPerBlock() {
+      return crcPerBlock;
+    }
+
+    int getChecksumSize() {
+      return checksumSize;
+    }
+  }
+
+  /**
+   * The abstract base block checksum computer.
+   */
+  static abstract class BlockChecksumComputer
+      extends AbstractBlockChecksumComputer {
     private final ExtendedBlock block;
     // client side now can specify a range of the block for checksum
     private final long requestLength;
@@ -56,17 +144,12 @@ final class BlockChecksumHelper {
     private final long visibleLength;
     private final boolean partialBlk;
 
-    private byte[] outBytes;
-    private int bytesPerCRC = -1;
-    private DataChecksum.Type crcType = null;
-    private long crcPerBlock = -1;
-    private int checksumSize = -1;
     private BlockMetadataHeader header;
     private DataChecksum checksum;
 
     BlockChecksumComputer(DataNode datanode,
                           ExtendedBlock block) throws IOException {
-      this.datanode = datanode;
+      super(datanode);
       this.block = block;
       this.requestLength = block.getNumBytes();
       Preconditions.checkArgument(requestLength >= 0);
@@ -81,98 +164,80 @@ final class BlockChecksumHelper {
           new BufferedInputStream(metadataIn, ioFileBufferSize));
     }
 
-    protected DataNode getDatanode() {
-      return datanode;
+    Sender createSender(IOStreamPair pair) {
+      DataOutputStream out = (DataOutputStream) pair.out;
+      return new Sender(out);
     }
 
-    protected ExtendedBlock getBlock() {
+
+    ExtendedBlock getBlock() {
       return block;
     }
 
-    protected long getRequestLength() {
+    long getRequestLength() {
       return requestLength;
     }
 
-    protected LengthInputStream getMetadataIn() {
+    LengthInputStream getMetadataIn() {
       return metadataIn;
     }
 
-    protected DataInputStream getChecksumIn() {
+    DataInputStream getChecksumIn() {
       return checksumIn;
     }
 
-    protected long getVisibleLength() {
+    long getVisibleLength() {
       return visibleLength;
     }
 
-    protected boolean isPartialBlk() {
+    boolean isPartialBlk() {
       return partialBlk;
     }
 
-    protected void setOutBytes(byte[] bytes) {
-      this.outBytes = bytes;
-    }
-
-    protected byte[] getOutBytes() {
-      return outBytes;
-    }
-
-    protected int getBytesPerCRC() {
-      return bytesPerCRC;
-    }
-
-    protected DataChecksum.Type getCrcType() {
-      return crcType;
-    }
-
-    protected long getCrcPerBlock() {
-      return crcPerBlock;
-    }
-
-    protected int getChecksumSize() {
-      return checksumSize;
-    }
-
-    protected BlockMetadataHeader getHeader() {
+    BlockMetadataHeader getHeader() {
       return header;
     }
 
-    protected DataChecksum getChecksum() {
+    DataChecksum getChecksum() {
       return checksum;
     }
 
     /**
      * Perform the block checksum computing.
+     *
      * @throws IOException
      */
     abstract void compute() throws IOException;
 
     /**
      * Read block metadata header.
+     *
      * @throws IOException
      */
-    protected void readHeader() throws IOException {
+    void readHeader() throws IOException {
       //read metadata file
       header = BlockMetadataHeader.readHeader(checksumIn);
       checksum = header.getChecksum();
-      checksumSize = checksum.getChecksumSize();
-      bytesPerCRC = checksum.getBytesPerChecksum();
-      crcPerBlock = checksumSize <= 0 ? 0 :
+      setChecksumSize(checksum.getChecksumSize());
+      setBytesPerCRC(checksum.getBytesPerChecksum());
+      long crcPerBlock = checksum.getChecksumSize() <= 0 ? 0 :
           (metadataIn.getLength() -
-              BlockMetadataHeader.getHeaderSize()) / checksumSize;
-      crcType = checksum.getChecksumType();
+              BlockMetadataHeader.getHeaderSize()) / checksum.getChecksumSize();
+      setCrcPerBlock(crcPerBlock);
+      setCrcType(checksum.getChecksumType());
     }
 
     /**
      * Calculate partial block checksum.
+     *
      * @return
      * @throws IOException
      */
-    protected byte[] crcPartialBlock() throws IOException {
-      int partialLength = (int) (requestLength % bytesPerCRC);
+    byte[] crcPartialBlock() throws IOException {
+      int partialLength = (int) (requestLength % getBytesPerCRC());
       if (partialLength > 0) {
         byte[] buf = new byte[partialLength];
-        final InputStream blockIn = datanode.data.getBlockInputStream(block,
+        final InputStream blockIn = getBlockInputStream(block,
             requestLength - partialLength);
         try {
           // Get the CRC of the partialLength.
@@ -181,7 +246,7 @@ final class BlockChecksumHelper {
           IOUtils.closeStream(blockIn);
         }
         checksum.update(buf, 0, partialLength);
-        byte[] partialCrc = new byte[checksumSize];
+        byte[] partialCrc = new byte[getChecksumSize()];
         checksum.writeValue(partialCrc, 0, true);
         return partialCrc;
       }
@@ -229,7 +294,7 @@ final class BlockChecksumHelper {
     }
 
     private MD5Hash checksumPartialBlock() throws IOException {
-      byte[] buffer = new byte[4*1024];
+      byte[] buffer = new byte[4 * 1024];
       MessageDigest digester = MD5Hash.getDigester();
 
       long remaining = (getRequestLength() / getBytesPerCRC())
@@ -251,4 +316,115 @@ final class BlockChecksumHelper {
       return new MD5Hash(digester.digest());
     }
   }
-}
+
+  /**
+   * Non-striped block group checksum computer for striped blocks.
+   */
+  static class BlockGroupNonStripedChecksumComputer
+      extends AbstractBlockChecksumComputer {
+
+    private final ExtendedBlock blockGroup;
+    private final ErasureCodingPolicy ecPolicy;
+    private final DatanodeInfo[] datanodes;
+    private final Token<BlockTokenIdentifier>[] blockTokens;
+
+    private final DataOutputBuffer md5writer = new DataOutputBuffer();
+
+    BlockGroupNonStripedChecksumComputer(DataNode datanode,
+                                         StripedBlockInfo stripedBlockInfo)
+        throws IOException {
+      super(datanode);
+      this.blockGroup = stripedBlockInfo.getBlock();
+      this.ecPolicy = stripedBlockInfo.getErasureCodingPolicy();
+      this.datanodes = stripedBlockInfo.getDatanodes();
+      this.blockTokens = stripedBlockInfo.getBlockTokens();
+    }
+
+    @Override
+    void compute() throws IOException {
+      for (int idx = 0; idx < ecPolicy.getNumDataUnits(); idx++) {
+        ExtendedBlock block =
+            StripedBlockUtil.constructInternalBlock(blockGroup,
+            ecPolicy.getCellSize(), ecPolicy.getNumDataUnits(), idx);
+        DatanodeInfo targetDatanode = datanodes[idx];
+        Token<BlockTokenIdentifier> blockToken = blockTokens[idx];
+        checksumBlock(block, idx, blockToken, targetDatanode);
+      }
+
+      MD5Hash md5out = MD5Hash.digest(md5writer.getData());
+      setOutBytes(md5out.getDigest());
+    }
+
+    private void checksumBlock(ExtendedBlock block, int blockIdx,
+                               Token<BlockTokenIdentifier> blockToken,
+                               DatanodeInfo targetDatanode) throws IOException {
+      int timeout = 3000;
+      try (IOStreamPair pair = getDatanode().connectToDN(targetDatanode,
+          timeout, block, blockToken)) {
+
+        LOG.debug("write to {}: {}, block={}",
+            getDatanode(), Op.BLOCK_CHECKSUM, block);
+
+        // get block MD5
+        createSender(pair).blockChecksum(block, blockToken);
+
+        final DataTransferProtos.BlockOpResponseProto reply =
+            DataTransferProtos.BlockOpResponseProto.parseFrom(
+                PBHelperClient.vintPrefixed(pair.in));
+
+        String logInfo = "for block " + block
+            + " from datanode " + targetDatanode;
+        DataTransferProtoUtil.checkBlockOpStatus(reply, logInfo);
+
+        DataTransferProtos.OpBlockChecksumResponseProto checksumData =
+            reply.getChecksumResponse();
+
+        //read byte-per-checksum
+        final int bpc = checksumData.getBytesPerCrc();
+        if (blockIdx == 0) { //first block
+          setBytesPerCRC(bpc);
+        } else if (bpc != getBytesPerCRC()) {
+          throw new IOException("Byte-per-checksum not matched: bpc=" + bpc
+              + " but bytesPerCRC=" + getBytesPerCRC());
+        }
+
+        //read crc-per-block
+        final long cpb = checksumData.getCrcPerBlock();
+        if (blockIdx == 0) {
+          setCrcPerBlock(cpb);
+        }
+
+        //read md5
+        final MD5Hash md5 = new MD5Hash(
+            checksumData.getMd5().toByteArray());
+        md5.write(md5writer);
+
+        // read crc-type
+        final DataChecksum.Type ct;
+        if (checksumData.hasCrcType()) {
+          ct = PBHelperClient.convert(checksumData.getCrcType());
+        } else {
+          LOG.debug("Retrieving checksum from an earlier-version DataNode: " +
+              "inferring checksum by reading first byte");
+          ct = DataChecksum.Type.DEFAULT;
+        }
+
+        if (blockIdx == 0) { // first block
+          setCrcType(ct);
+        } else if (getCrcType() != DataChecksum.Type.MIXED &&
+            getCrcType() != ct) {
+          // if crc types are mixed in a file
+          setCrcType(DataChecksum.Type.MIXED);
+        }
+
+        if (LOG.isDebugEnabled()) {
+          if (blockIdx == 0) {
+            LOG.debug("set bytesPerCRC=" + getBytesPerCRC()
+                + ", crcPerBlock=" + getCrcPerBlock());
+          }
+          LOG.debug("got reply from " + targetDatanode + ": md5=" + md5);
+        }
+      }
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e5ff0ea7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
index 1d4a79a..63bf5ae 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.StripedBlockInfo;
 import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
 import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
@@ -46,7 +47,9 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.server.datanode.BlockChecksumHelper.BlockChecksumComputer;
+import org.apache.hadoop.hdfs.server.datanode.BlockChecksumHelper.AbstractBlockChecksumComputer;
 import org.apache.hadoop.hdfs.server.datanode.BlockChecksumHelper.ReplicatedBlockChecksumComputer;
+import org.apache.hadoop.hdfs.server.datanode.BlockChecksumHelper.BlockGroupNonStripedChecksumComputer;
 import org.apache.hadoop.hdfs.server.datanode.DataNode.ShortCircuitFdsUnsupportedException;
 import org.apache.hadoop.hdfs.server.datanode.DataNode.ShortCircuitFdsVersionException;
 import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry.NewShmInfo;
@@ -924,6 +927,46 @@ class DataXceiver extends Receiver implements Runnable {
   }
 
   @Override
+  public void blockGroupChecksum(final StripedBlockInfo stripedBlockInfo,
+                                 final Token<BlockTokenIdentifier> blockToken)
+      throws IOException {
+    updateCurrentThreadName("Getting checksum for block group" +
+        stripedBlockInfo.getBlock());
+    final DataOutputStream out = new DataOutputStream(getOutputStream());
+    checkAccess(out, true, stripedBlockInfo.getBlock(), blockToken,
+        Op.BLOCK_GROUP_CHECKSUM, BlockTokenIdentifier.AccessMode.READ);
+
+    AbstractBlockChecksumComputer maker =
+        new BlockGroupNonStripedChecksumComputer(datanode, stripedBlockInfo);
+
+    try {
+      maker.compute();
+
+      //write reply
+      BlockOpResponseProto.newBuilder()
+          .setStatus(SUCCESS)
+          .setChecksumResponse(OpBlockChecksumResponseProto.newBuilder()
+              .setBytesPerCrc(maker.getBytesPerCRC())
+              .setCrcPerBlock(maker.getCrcPerBlock())
+              .setMd5(ByteString.copyFrom(maker.getOutBytes()))
+              .setCrcType(PBHelperClient.convert(maker.getCrcType())))
+          .build()
+          .writeDelimitedTo(out);
+      out.flush();
+    } catch (IOException ioe) {
+      LOG.info("blockChecksum " + stripedBlockInfo.getBlock() +
+          " received exception " + ioe);
+      incrDatanodeNetworkErrors();
+      throw ioe;
+    } finally {
+      IOUtils.closeStream(out);
+    }
+
+    //update metrics
+    datanode.metrics.addBlockChecksumOp(elapsed());
+  }
+
+  @Override
   public void copyBlock(final ExtendedBlock block,
       final Token<BlockTokenIdentifier> blockToken) throws IOException {
     updateCurrentThreadName("Copying block " + block);