You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dr...@apache.org on 2016/06/01 04:58:12 UTC

hadoop git commit: HDFS-9833. Erasure coding: recomputing block checksum on the fly by reconstructing the missed/corrupt block data. Contributed by Rakesh R.

Repository: hadoop
Updated Branches:
  refs/heads/trunk 8ceb06e23 -> d749cf65e


HDFS-9833. Erasure coding: recomputing block checksum on the fly by reconstructing the missed/corrupt block data. Contributed by Rakesh R.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d749cf65
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d749cf65
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d749cf65

Branch: refs/heads/trunk
Commit: d749cf65e1ab0e0daf5be86931507183f189e855
Parents: 8ceb06e
Author: Kai Zheng <ka...@intel.com>
Authored: Thu Jun 2 12:56:21 2016 +0800
Committer: Kai Zheng <ka...@intel.com>
Committed: Thu Jun 2 12:56:21 2016 +0800

----------------------------------------------------------------------
 .../apache/hadoop/hdfs/FileChecksumHelper.java  |   3 +-
 .../hadoop/hdfs/protocol/StripedBlockInfo.java  |  10 +-
 .../hdfs/protocol/datatransfer/Sender.java      |   2 +
 .../hadoop/hdfs/protocolPB/PBHelperClient.java  |  16 ++
 .../src/main/proto/datatransfer.proto           |   1 +
 .../hdfs/protocol/datatransfer/Receiver.java    |   1 +
 .../server/datanode/BlockChecksumHelper.java    | 172 ++++++++++++++-----
 .../erasurecode/ErasureCodingWorker.java        |  15 +-
 .../StripedBlockChecksumReconstructor.java      | 129 ++++++++++++++
 .../erasurecode/StripedBlockReconstructor.java  | 119 +++++++++++++
 .../datanode/erasurecode/StripedReader.java     |  22 +--
 .../erasurecode/StripedReconstructionInfo.java  |  99 +++++++++++
 .../erasurecode/StripedReconstructor.java       | 169 +++++++-----------
 .../datanode/erasurecode/StripedWriter.java     |  29 ++--
 .../hdfs/TestDecommissionWithStriped.java       |  47 +++++
 .../apache/hadoop/hdfs/TestFileChecksum.java    |  41 ++++-
 16 files changed, 675 insertions(+), 200 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/d749cf65/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FileChecksumHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FileChecksumHelper.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FileChecksumHelper.java
index dfd9393..c213fa3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FileChecksumHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FileChecksumHelper.java
@@ -460,7 +460,8 @@ final class FileChecksumHelper {
       setRemaining(getRemaining() - block.getNumBytes());
 
       StripedBlockInfo stripedBlockInfo = new StripedBlockInfo(block,
-          blockGroup.getLocations(), blockGroup.getBlockTokens(), ecPolicy);
+          blockGroup.getLocations(), blockGroup.getBlockTokens(),
+          blockGroup.getBlockIndices(), ecPolicy);
       DatanodeInfo[] datanodes = blockGroup.getLocations();
 
       //try each datanode in the block group.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d749cf65/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/StripedBlockInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/StripedBlockInfo.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/StripedBlockInfo.java
index 74e8081..e46fabc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/StripedBlockInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/StripedBlockInfo.java
@@ -32,14 +32,16 @@ public class StripedBlockInfo {
   private final ExtendedBlock block;
   private final DatanodeInfo[] datanodes;
   private final Token<BlockTokenIdentifier>[] blockTokens;
+  private final byte[] blockIndices;
   private final ErasureCodingPolicy ecPolicy;
 
   public StripedBlockInfo(ExtendedBlock block, DatanodeInfo[] datanodes,
-                          Token<BlockTokenIdentifier>[] blockTokens,
-                          ErasureCodingPolicy ecPolicy) {
+      Token<BlockTokenIdentifier>[] blockTokens, byte[] blockIndices,
+      ErasureCodingPolicy ecPolicy) {
     this.block = block;
     this.datanodes = datanodes;
     this.blockTokens = blockTokens;
+    this.blockIndices = blockIndices;
     this.ecPolicy = ecPolicy;
   }
 
@@ -55,6 +57,10 @@ public class StripedBlockInfo {
     return blockTokens;
   }
 
+  public byte[] getBlockIndices() {
+    return blockIndices;
+  }
+
   public ErasureCodingPolicy getErasureCodingPolicy() {
     return ecPolicy;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d749cf65/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
index 585ed99..bc73bfc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
@@ -274,6 +274,8 @@ public class Sender implements DataTransferProtocol {
             stripedBlockInfo.getDatanodes()))
         .addAllBlockTokens(PBHelperClient.convert(
             stripedBlockInfo.getBlockTokens()))
+        .addAllBlockIndices(PBHelperClient
+            .convertBlockIndices(stripedBlockInfo.getBlockIndices()))
         .setEcPolicy(PBHelperClient.convertErasureCodingPolicy(
             stripedBlockInfo.getErasureCodingPolicy()))
         .build();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d749cf65/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
index a05567b..d5bb1e7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
@@ -853,6 +853,22 @@ public class PBHelperClient {
     return results;
   }
 
+  public static List<Integer> convertBlockIndices(byte[] blockIndices) {
+    List<Integer> results = new ArrayList<>(blockIndices.length);
+    for (byte bt : blockIndices) {
+      results.add(Integer.valueOf(bt));
+    }
+    return results;
+  }
+
+  public static byte[] convertBlockIndices(List<Integer> blockIndices) {
+    byte[] blkIndices = new byte[blockIndices.size()];
+    for (int i = 0; i < blockIndices.size(); i++) {
+      blkIndices[i] = (byte) blockIndices.get(i).intValue();
+    }
+    return blkIndices;
+  }
+
   public static BlockStoragePolicy convert(BlockStoragePolicyProto proto) {
     List<StorageTypeProto> cList = proto.getCreationPolicy()
         .getStorageTypesList();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d749cf65/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto
index 522ee06..1407351 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto
@@ -154,6 +154,7 @@ message OpBlockGroupChecksumProto {
   // each internal block has a block token
   repeated hadoop.common.TokenProto blockTokens = 3;
   required ErasureCodingPolicyProto ecPolicy = 4;
+  repeated uint32 blockIndices = 5;
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d749cf65/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
index b2f26f8..8b863f7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
@@ -306,6 +306,7 @@ public abstract class Receiver implements DataTransferProtocol {
         PBHelperClient.convert(proto.getHeader().getBlock()),
         PBHelperClient.convert(proto.getDatanodes()),
         PBHelperClient.convertTokens(proto.getBlockTokensList()),
+        PBHelperClient.convertBlockIndices(proto.getBlockIndicesList()),
         PBHelperClient.convertErasureCodingPolicy(proto.getEcPolicy())
     );
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d749cf65/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockChecksumHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockChecksumHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockChecksumHelper.java
index 1f1a25c..ec6bbb6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockChecksumHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockChecksumHelper.java
@@ -18,6 +18,8 @@
 package org.apache.hadoop.hdfs.server.datanode;
 
 import com.google.common.base.Preconditions;
+
+import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.DFSUtilClient;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
@@ -30,6 +32,8 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos;
 import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.datanode.erasurecode.StripedBlockChecksumReconstructor;
+import org.apache.hadoop.hdfs.server.datanode.erasurecode.StripedReconstructionInfo;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
 import org.apache.hadoop.hdfs.util.StripedBlockUtil;
 import org.apache.hadoop.io.DataOutputBuffer;
@@ -46,11 +50,14 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.security.MessageDigest;
+import java.util.HashMap;
+import java.util.Map;
 
 /**
  * Utilities for Block checksum computing, for both replicated and striped
  * blocks.
  */
+@InterfaceAudience.Private
 final class BlockChecksumHelper {
 
   static final Logger LOG = LoggerFactory.getLogger(BlockChecksumHelper.class);
@@ -327,6 +334,7 @@ final class BlockChecksumHelper {
     private final ErasureCodingPolicy ecPolicy;
     private final DatanodeInfo[] datanodes;
     private final Token<BlockTokenIdentifier>[] blockTokens;
+    private final byte[] blockIndices;
 
     private final DataOutputBuffer md5writer = new DataOutputBuffer();
 
@@ -338,17 +346,61 @@ final class BlockChecksumHelper {
       this.ecPolicy = stripedBlockInfo.getErasureCodingPolicy();
       this.datanodes = stripedBlockInfo.getDatanodes();
       this.blockTokens = stripedBlockInfo.getBlockTokens();
+      this.blockIndices = stripedBlockInfo.getBlockIndices();
+    }
+
+    private static class LiveBlockInfo {
+      private final DatanodeInfo dn;
+      private final Token<BlockTokenIdentifier> token;
+
+      LiveBlockInfo(DatanodeInfo dn, Token<BlockTokenIdentifier> token) {
+        this.dn = dn;
+        this.token = token;
+      }
+
+      DatanodeInfo getDn() {
+        return dn;
+      }
+
+      Token<BlockTokenIdentifier> getToken() {
+        return token;
+      }
     }
 
     @Override
     void compute() throws IOException {
-      for (int idx = 0; idx < ecPolicy.getNumDataUnits(); idx++) {
-        ExtendedBlock block =
-            StripedBlockUtil.constructInternalBlock(blockGroup,
-            ecPolicy.getCellSize(), ecPolicy.getNumDataUnits(), idx);
-        DatanodeInfo targetDatanode = datanodes[idx];
-        Token<BlockTokenIdentifier> blockToken = blockTokens[idx];
-        checksumBlock(block, idx, blockToken, targetDatanode);
+      assert datanodes.length == blockIndices.length;
+
+      Map<Byte, LiveBlockInfo> liveDns = new HashMap<>(datanodes.length);
+      int blkIndxLen = blockIndices.length;
+      int numDataUnits = ecPolicy.getNumDataUnits();
+      // Prepare live datanode list. Missing data blocks will be reconstructed
+      // and recalculate checksum.
+      for (int idx = 0; idx < blkIndxLen; idx++) {
+        liveDns.put(blockIndices[idx],
+            new LiveBlockInfo(datanodes[idx], blockTokens[idx]));
+      }
+      for (int idx = 0; idx < numDataUnits && idx < blkIndxLen; idx++) {
+        try {
+          LiveBlockInfo liveBlkInfo = liveDns.get((byte) idx);
+          if (liveBlkInfo == null) {
+            // reconstruct block and calculate checksum for missing node
+            recalculateChecksum(idx);
+          } else {
+            try {
+              ExtendedBlock block = StripedBlockUtil.constructInternalBlock(
+                  blockGroup, ecPolicy.getCellSize(), numDataUnits, idx);
+              checksumBlock(block, idx, liveBlkInfo.getToken(),
+                  liveBlkInfo.getDn());
+            } catch (IOException ioe) {
+              LOG.warn("Exception while reading checksum", ioe);
+              // reconstruct block and calculate checksum for the failed node
+              recalculateChecksum(idx);
+            }
+          }
+        } catch (IOException e) {
+          LOG.warn("Failed to get the checksum", e);
+        }
       }
 
       MD5Hash md5out = MD5Hash.digest(md5writer.getData());
@@ -379,52 +431,90 @@ final class BlockChecksumHelper {
         DataTransferProtos.OpBlockChecksumResponseProto checksumData =
             reply.getChecksumResponse();
 
-        //read byte-per-checksum
-        final int bpc = checksumData.getBytesPerCrc();
-        if (blockIdx == 0) { //first block
-          setBytesPerCRC(bpc);
-        } else if (bpc != getBytesPerCRC()) {
-          throw new IOException("Byte-per-checksum not matched: bpc=" + bpc
-              + " but bytesPerCRC=" + getBytesPerCRC());
-        }
-
-        //read crc-per-block
-        final long cpb = checksumData.getCrcPerBlock();
-        if (blockIdx == 0) {
-          setCrcPerBlock(cpb);
-        }
-
-        //read md5
-        final MD5Hash md5 = new MD5Hash(
-            checksumData.getMd5().toByteArray());
-        md5.write(md5writer);
-
         // read crc-type
         final DataChecksum.Type ct;
         if (checksumData.hasCrcType()) {
           ct = PBHelperClient.convert(checksumData.getCrcType());
         } else {
-          LOG.debug("Retrieving checksum from an earlier-version DataNode: " +
-              "inferring checksum by reading first byte");
+          LOG.debug("Retrieving checksum from an earlier-version DataNode: "
+              + "inferring checksum by reading first byte");
           ct = DataChecksum.Type.DEFAULT;
         }
 
-        if (blockIdx == 0) { // first block
-          setCrcType(ct);
-        } else if (getCrcType() != DataChecksum.Type.MIXED &&
-            getCrcType() != ct) {
-          // if crc types are mixed in a file
-          setCrcType(DataChecksum.Type.MIXED);
-        }
-
+        setOrVerifyChecksumProperties(blockIdx, checksumData.getBytesPerCrc(),
+            checksumData.getCrcPerBlock(), ct);
+        //read md5
+        final MD5Hash md5 = new MD5Hash(checksumData.getMd5().toByteArray());
+        md5.write(md5writer);
         if (LOG.isDebugEnabled()) {
-          if (blockIdx == 0) {
-            LOG.debug("set bytesPerCRC=" + getBytesPerCRC()
-                + ", crcPerBlock=" + getCrcPerBlock());
-          }
           LOG.debug("got reply from " + targetDatanode + ": md5=" + md5);
         }
       }
     }
+
+    /**
+     * Reconstruct this data block and recalculate checksum.
+     *
+     * @param errBlkIndex
+     *          error index to be reconstrcuted and recalculate checksum.
+     * @throws IOException
+     */
+    private void recalculateChecksum(int errBlkIndex) throws IOException {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Recalculate checksum for the missing/failed block index "
+            + errBlkIndex);
+      }
+      byte[] errIndices = new byte[1];
+      errIndices[0] = (byte) errBlkIndex;
+      StripedReconstructionInfo stripedReconInfo =
+          new StripedReconstructionInfo(
+          blockGroup, ecPolicy, blockIndices, datanodes, errIndices);
+      final StripedBlockChecksumReconstructor checksumRecon =
+          new StripedBlockChecksumReconstructor(
+          getDatanode().getErasureCodingWorker(), stripedReconInfo,
+          md5writer);
+      checksumRecon.reconstruct();
+
+      DataChecksum checksum = checksumRecon.getChecksum();
+      long crcPerBlock = checksum.getChecksumSize() <= 0 ? 0
+          : checksumRecon.getChecksumDataLen() / checksum.getChecksumSize();
+      setOrVerifyChecksumProperties(errBlkIndex, checksum.getBytesPerChecksum(),
+          crcPerBlock, checksum.getChecksumType());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Recalculated checksum for the block index " + errBlkIndex
+            + ": md5=" + checksumRecon.getMD5());
+      }
+    }
+
+    private void setOrVerifyChecksumProperties(int blockIdx, int bpc,
+        final long cpb, DataChecksum.Type ct) throws IOException {
+      //read byte-per-checksum
+      if (blockIdx == 0) { //first block
+        setBytesPerCRC(bpc);
+      } else if (bpc != getBytesPerCRC()) {
+        throw new IOException("Byte-per-checksum not matched: bpc=" + bpc
+            + " but bytesPerCRC=" + getBytesPerCRC());
+      }
+
+      //read crc-per-block
+      if (blockIdx == 0) {
+        setCrcPerBlock(cpb);
+      }
+
+      if (blockIdx == 0) { // first block
+        setCrcType(ct);
+      } else if (getCrcType() != DataChecksum.Type.MIXED &&
+          getCrcType() != ct) {
+        // if crc types are mixed in a file
+        setCrcType(DataChecksum.Type.MIXED);
+      }
+
+      if (LOG.isDebugEnabled()) {
+        if (blockIdx == 0) {
+          LOG.debug("set bytesPerCRC=" + getBytesPerCRC()
+              + ", crcPerBlock=" + getCrcPerBlock());
+        }
+      }
+    }
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d749cf65/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
index e7c5abc..aacbb2d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
@@ -116,19 +116,24 @@ public final class ErasureCodingWorker {
    */
   public void processErasureCodingTasks(
       Collection<BlockECReconstructionInfo> ecTasks) {
-    for (BlockECReconstructionInfo reconstructionInfo : ecTasks) {
+    for (BlockECReconstructionInfo reconInfo : ecTasks) {
       try {
-        final StripedReconstructor task =
-            new StripedReconstructor(this, reconstructionInfo);
+        StripedReconstructionInfo stripedReconInfo =
+            new StripedReconstructionInfo(
+            reconInfo.getExtendedBlock(), reconInfo.getErasureCodingPolicy(),
+            reconInfo.getLiveBlockIndices(), reconInfo.getSourceDnInfos(),
+            reconInfo.getTargetDnInfos(), reconInfo.getTargetStorageTypes());
+        final StripedBlockReconstructor task =
+            new StripedBlockReconstructor(this, stripedReconInfo);
         if (task.hasValidTargets()) {
           stripedReconstructionPool.submit(task);
         } else {
           LOG.warn("No missing internal block. Skip reconstruction for task:{}",
-              reconstructionInfo);
+              reconInfo);
         }
       } catch (Throwable e) {
         LOG.warn("Failed to reconstruct striped block {}",
-            reconstructionInfo.getExtendedBlock().getLocalBlock(), e);
+            reconInfo.getExtendedBlock().getLocalBlock(), e);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d749cf65/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumReconstructor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumReconstructor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumReconstructor.java
new file mode 100644
index 0000000..1b6758b
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumReconstructor.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode.erasurecode;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.security.MessageDigest;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.MD5Hash;
+
+/**
+ * StripedBlockChecksumReconstructor reconstruct one or more missed striped
+ * block in the striped block group, the minimum number of live striped blocks
+ * should be no less than data block number. Then checksum will be recalculated
+ * using the newly reconstructed block.
+ */
+@InterfaceAudience.Private
+public class StripedBlockChecksumReconstructor extends StripedReconstructor {
+
+  private ByteBuffer targetBuffer;
+  private final byte[] targetIndices;
+
+  private byte[] checksumBuf;
+  private DataOutputBuffer checksumWriter;
+  private MD5Hash md5;
+  private long checksumDataLen;
+
+  public StripedBlockChecksumReconstructor(ErasureCodingWorker worker,
+      StripedReconstructionInfo stripedReconInfo,
+      DataOutputBuffer checksumWriter) throws IOException {
+    super(worker, stripedReconInfo);
+    this.targetIndices = stripedReconInfo.getTargetIndices();
+    assert targetIndices != null;
+    this.checksumWriter = checksumWriter;
+    init();
+  }
+
+  private void init() throws IOException {
+    getStripedReader().init();
+    // allocate buffer to keep the reconstructed block data
+    targetBuffer = allocateBuffer(getBufferSize());
+    long maxTargetLen = 0L;
+    for (int targetIndex : targetIndices) {
+      maxTargetLen = Math.max(maxTargetLen, getBlockLen(targetIndex));
+    }
+    setMaxTargetLength(maxTargetLen);
+    int checksumSize = getChecksum().getChecksumSize();
+    int bytesPerChecksum = getChecksum().getBytesPerChecksum();
+    int tmpLen = checksumSize * (getBufferSize() / bytesPerChecksum);
+    checksumBuf = new byte[tmpLen];
+  }
+
+  public void reconstruct() throws IOException {
+    MessageDigest digester = MD5Hash.getDigester();
+    while (getPositionInBlock() < getMaxTargetLength()) {
+      long remaining = getMaxTargetLength() - getPositionInBlock();
+      final int toReconstructLen = (int) Math
+          .min(getStripedReader().getBufferSize(), remaining);
+      // step1: read from minimum source DNs required for reconstruction.
+      // The returned success list is the source DNs we do real read from
+      getStripedReader().readMinimumSources(toReconstructLen);
+
+      // step2: decode to reconstruct targets
+      reconstructTargets(toReconstructLen);
+
+      // step3: calculate checksum
+      getChecksum().calculateChunkedSums(targetBuffer.array(), 0,
+          targetBuffer.remaining(), checksumBuf, 0);
+
+      // step4: updates the digest using the checksum array of bytes
+      digester.update(checksumBuf, 0, checksumBuf.length);
+      checksumDataLen += checksumBuf.length;
+      updatePositionInBlock(toReconstructLen);
+      clearBuffers();
+    }
+
+    byte[] digest = digester.digest();
+    md5 = new MD5Hash(digest);
+    md5.write(checksumWriter);
+  }
+
+  private void reconstructTargets(int toReconstructLen) {
+    initDecoderIfNecessary();
+
+    ByteBuffer[] inputs = getStripedReader().getInputBuffers(toReconstructLen);
+
+    ByteBuffer[] outputs = new ByteBuffer[1];
+    targetBuffer.limit(toReconstructLen);
+    outputs[0] = targetBuffer;
+    int[] tarIndices = new int[targetIndices.length];
+    for (int i = 0; i < targetIndices.length; i++) {
+      tarIndices[i] = targetIndices[i];
+    }
+    getDecoder().decode(inputs, tarIndices, outputs);
+  }
+
+  /**
+   * Clear all associated buffers.
+   */
+  private void clearBuffers() {
+    getStripedReader().clearBuffers();
+    targetBuffer.clear();
+  }
+
+  public MD5Hash getMD5() {
+    return md5;
+  }
+
+  public long getChecksumDataLen() {
+    return checksumDataLen;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d749cf65/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java
new file mode 100644
index 0000000..b800bef
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode.erasurecode;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * StripedBlockReconstructor reconstruct one or more missed striped block in
+ * the striped block group, the minimum number of live striped blocks should
+ * be no less than data block number.
+ */
+@InterfaceAudience.Private
+class StripedBlockReconstructor extends StripedReconstructor
+    implements Runnable {
+
+  private StripedWriter stripedWriter;
+
+  StripedBlockReconstructor(ErasureCodingWorker worker,
+      StripedReconstructionInfo stripedReconInfo) {
+    super(worker, stripedReconInfo);
+
+    stripedWriter = new StripedWriter(this, getDatanode(),
+        getConf(), stripedReconInfo);
+  }
+
+  boolean hasValidTargets() {
+    return stripedWriter.hasValidTargets();
+  }
+
+  @Override
+  public void run() {
+    getDatanode().incrementXmitsInProgress();
+    try {
+      getStripedReader().init();
+
+      stripedWriter.init();
+
+      reconstruct();
+
+      stripedWriter.endTargetBlocks();
+
+      // Currently we don't check the acks for packets, this is similar as
+      // block replication.
+    } catch (Throwable e) {
+      LOG.warn("Failed to reconstruct striped block: {}", getBlockGroup(), e);
+      getDatanode().getMetrics().incrECFailedReconstructionTasks();
+    } finally {
+      getDatanode().decrementXmitsInProgress();
+      getDatanode().getMetrics().incrECReconstructionTasks();
+      getStripedReader().close();
+      stripedWriter.close();
+    }
+  }
+
+  void reconstruct() throws IOException {
+    while (getPositionInBlock() < getMaxTargetLength()) {
+      long remaining = getMaxTargetLength() - getPositionInBlock();
+      final int toReconstructLen =
+          (int) Math.min(getStripedReader().getBufferSize(), remaining);
+
+      // step1: read from minimum source DNs required for reconstruction.
+      // The returned success list is the source DNs we do real read from
+      getStripedReader().readMinimumSources(toReconstructLen);
+
+      // step2: decode to reconstruct targets
+      reconstructTargets(toReconstructLen);
+
+      // step3: transfer data
+      if (stripedWriter.transferData2Targets() == 0) {
+        String error = "Transfer failed for all targets.";
+        throw new IOException(error);
+      }
+
+      updatePositionInBlock(toReconstructLen);
+
+      clearBuffers();
+    }
+  }
+
+  private void reconstructTargets(int toReconstructLen) {
+    initDecoderIfNecessary();
+
+    ByteBuffer[] inputs = getStripedReader().getInputBuffers(toReconstructLen);
+
+    int[] erasedIndices = stripedWriter.getRealTargetIndices();
+    ByteBuffer[] outputs = stripedWriter.getRealTargetBuffers(toReconstructLen);
+
+    getDecoder().decode(inputs, erasedIndices, outputs);
+
+    stripedWriter.updateRealTargetBuffers(toReconstructLen);
+  }
+
+  /**
+   * Clear all associated buffers.
+   */
+  private void clearBuffers() {
+    getStripedReader().clearBuffers();
+
+    stripedWriter.clearBuffers();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d749cf65/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReader.java
index fb7699a..e6d4ceb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReader.java
@@ -23,11 +23,8 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtilClient.CorruptedBlocks;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
-import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo;
 import org.apache.hadoop.hdfs.util.StripedBlockUtil;
 import org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunkReadResult;
 import org.apache.hadoop.util.DataChecksum;
@@ -85,8 +82,7 @@ class StripedReader {
   private final CompletionService<Void> readService;
 
   StripedReader(StripedReconstructor reconstructor, DataNode datanode,
-                Configuration conf,
-                BlockECReconstructionInfo reconstructionInfo) {
+      Configuration conf, StripedReconstructionInfo stripedReconInfo) {
     stripedReadTimeoutInMills = conf.getInt(
         DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_TIMEOUT_MILLIS_KEY,
         DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_TIMEOUT_MILLIS_DEFAULT);
@@ -98,13 +94,11 @@ class StripedReader {
     this.datanode = datanode;
     this.conf = conf;
 
-    ErasureCodingPolicy ecPolicy = reconstructionInfo.getErasureCodingPolicy();
-    dataBlkNum = ecPolicy.getNumDataUnits();
-    parityBlkNum = ecPolicy.getNumParityUnits();
+    dataBlkNum = stripedReconInfo.getEcPolicy().getNumDataUnits();
+    parityBlkNum = stripedReconInfo.getEcPolicy().getNumParityUnits();
 
-    ExtendedBlock blockGroup = reconstructionInfo.getExtendedBlock();
-    int cellsNum = (int)((blockGroup.getNumBytes() - 1) / ecPolicy.getCellSize()
-        + 1);
+    int cellsNum = (int) ((stripedReconInfo.getBlockGroup().getNumBytes() - 1)
+        / stripedReconInfo.getEcPolicy().getCellSize() + 1);
     minRequiredSources = Math.min(cellsNum, dataBlkNum);
 
     if (minRequiredSources < dataBlkNum) {
@@ -113,8 +107,10 @@ class StripedReader {
       zeroStripeIndices = new short[zeroStripNum];
     }
 
-    liveIndices = reconstructionInfo.getLiveBlockIndices();
-    sources = reconstructionInfo.getSourceDnInfos();
+    this.liveIndices = stripedReconInfo.getLiveIndices();
+    assert liveIndices != null;
+    this.sources = stripedReconInfo.getSources();
+    assert sources != null;
 
     readers = new ArrayList<>(sources.length);
     readService = reconstructor.createReadService();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d749cf65/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructionInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructionInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructionInfo.java
new file mode 100644
index 0000000..a5c328b
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructionInfo.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode.erasurecode;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+
+/**
+ * Stores striped block info that can be used for block reconstruction.
+ */
+@InterfaceAudience.Private
+public class StripedReconstructionInfo {
+
+  private final ExtendedBlock blockGroup;
+  private final ErasureCodingPolicy ecPolicy;
+
+  // source info
+  private final byte[] liveIndices;
+  private final DatanodeInfo[] sources;
+
+  // target info
+  private final byte[] targetIndices;
+  private final DatanodeInfo[] targets;
+  private final StorageType[] targetStorageTypes;
+
+  public StripedReconstructionInfo(ExtendedBlock blockGroup,
+      ErasureCodingPolicy ecPolicy, byte[] liveIndices, DatanodeInfo[] sources,
+      byte[] targetIndices) {
+    this(blockGroup, ecPolicy, liveIndices, sources, targetIndices, null, null);
+  }
+
+  StripedReconstructionInfo(ExtendedBlock blockGroup,
+      ErasureCodingPolicy ecPolicy, byte[] liveIndices, DatanodeInfo[] sources,
+      DatanodeInfo[] targets, StorageType[] targetStorageTypes) {
+    this(blockGroup, ecPolicy, liveIndices, sources, null, targets,
+        targetStorageTypes);
+  }
+
+  private StripedReconstructionInfo(ExtendedBlock blockGroup,
+      ErasureCodingPolicy ecPolicy, byte[] liveIndices, DatanodeInfo[] sources,
+      byte[] targetIndices, DatanodeInfo[] targets,
+      StorageType[] targetStorageTypes) {
+
+    this.blockGroup = blockGroup;
+    this.ecPolicy = ecPolicy;
+    this.liveIndices = liveIndices;
+    this.sources = sources;
+    this.targetIndices = targetIndices;
+    this.targets = targets;
+    this.targetStorageTypes = targetStorageTypes;
+  }
+
+  ExtendedBlock getBlockGroup() {
+    return blockGroup;
+  }
+
+  ErasureCodingPolicy getEcPolicy() {
+    return ecPolicy;
+  }
+
+  byte[] getLiveIndices() {
+    return liveIndices;
+  }
+
+  DatanodeInfo[] getSources() {
+    return sources;
+  }
+
+  byte[] getTargetIndices() {
+    return targetIndices;
+  }
+
+  DatanodeInfo[] getTargets() {
+    return targets;
+  }
+
+  StorageType[] getTargetStorageTypes() {
+    return targetStorageTypes;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d749cf65/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java
index 47a6979..782d091 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java
@@ -24,7 +24,6 @@ import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
-import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo;
 import org.apache.hadoop.hdfs.util.StripedBlockUtil;
 import org.apache.hadoop.io.erasurecode.CodecUtil;
 import org.apache.hadoop.io.erasurecode.ErasureCoderOptions;
@@ -39,6 +38,7 @@ import java.nio.ByteBuffer;
 import java.util.BitSet;
 import java.util.concurrent.CompletionService;
 import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ThreadPoolExecutor;
 
 /**
  * StripedReconstructor reconstruct one or more missed striped block in the
@@ -94,58 +94,50 @@ import java.util.concurrent.ExecutorCompletionService;
  *    reconstructed result received by targets?
  */
 @InterfaceAudience.Private
-class StripedReconstructor implements Runnable {
-  private static final Logger LOG = DataNode.LOG;
+abstract class StripedReconstructor {
+  protected static final Logger LOG = DataNode.LOG;
 
-  private final ErasureCodingWorker worker;
-  private final DataNode datanode;
   private final Configuration conf;
-
+  private final DataNode datanode;
   private final ErasureCodingPolicy ecPolicy;
-
   private RawErasureDecoder decoder;
-
   private final ExtendedBlock blockGroup;
-  private final BitSet liveBitSet;
 
   // position in striped internal block
   private long positionInBlock;
-
   private StripedReader stripedReader;
-
-  private StripedWriter stripedWriter;
-
+  private ThreadPoolExecutor stripedReadPool;
   private final CachingStrategy cachingStrategy;
+  private long maxTargetLength = 0L;
+  private final BitSet liveBitSet;
 
   StripedReconstructor(ErasureCodingWorker worker,
-                       BlockECReconstructionInfo reconstructionInfo) {
-    this.worker = worker;
+      StripedReconstructionInfo stripedReconInfo) {
+    this.stripedReadPool = worker.getStripedReadPool();
     this.datanode = worker.getDatanode();
     this.conf = worker.getConf();
-
-    ecPolicy = reconstructionInfo.getErasureCodingPolicy();
-
-    blockGroup = reconstructionInfo.getExtendedBlock();
-    byte[] liveIndices = reconstructionInfo.getLiveBlockIndices();
-    liveBitSet = new BitSet(ecPolicy.getNumDataUnits() +
-        ecPolicy.getNumParityUnits());
-    for (int i = 0; i < liveIndices.length; i++) {
-      liveBitSet.set(liveIndices[i]);
+    this.ecPolicy = stripedReconInfo.getEcPolicy();
+    liveBitSet = new BitSet(
+        ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits());
+    for (int i = 0; i < stripedReconInfo.getLiveIndices().length; i++) {
+      liveBitSet.set(stripedReconInfo.getLiveIndices()[i]);
     }
-
-    stripedReader = new StripedReader(this, datanode,
-        conf, reconstructionInfo);
-    stripedWriter = new StripedWriter(this, datanode,
-        conf, reconstructionInfo);
+    blockGroup = stripedReconInfo.getBlockGroup();
+    stripedReader = new StripedReader(this, datanode, conf, stripedReconInfo);
 
     cachingStrategy = CachingStrategy.newDefaultStrategy();
 
     positionInBlock = 0L;
   }
 
-  BitSet getLiveBitSet() {
-    return liveBitSet;
-  }
+  /**
+   * Reconstruct one or more missed striped block in the striped block group,
+   * the minimum number of live striped blocks should be no less than data
+   * block number.
+   *
+   * @throws IOException
+   */
+  abstract void reconstruct() throws IOException;
 
   ByteBuffer allocateBuffer(int length) {
     return ByteBuffer.allocate(length);
@@ -160,61 +152,8 @@ class StripedReconstructor implements Runnable {
         ecPolicy, i);
   }
 
-  boolean hasValidTargets() {
-    return stripedWriter.hasValidTargets();
-  }
-
-  @Override
-  public void run() {
-    datanode.incrementXmitsInProgress();
-    try {
-      stripedReader.init();
-
-      stripedWriter.init();
-
-      reconstructAndTransfer();
-
-      stripedWriter.endTargetBlocks();
-
-      // Currently we don't check the acks for packets, this is similar as
-      // block replication.
-    } catch (Throwable e) {
-      LOG.warn("Failed to reconstruct striped block: {}", blockGroup, e);
-      datanode.getMetrics().incrECFailedReconstructionTasks();
-    } finally {
-      datanode.decrementXmitsInProgress();
-      datanode.getMetrics().incrECReconstructionTasks();
-      stripedReader.close();
-      stripedWriter.close();
-    }
-  }
-
-  void reconstructAndTransfer() throws IOException {
-    while (positionInBlock < stripedWriter.getMaxTargetLength()) {
-      long remaining = stripedWriter.getMaxTargetLength() - positionInBlock;
-      final int toReconstructLen =
-          (int) Math.min(stripedReader.getBufferSize(), remaining);
-      // step1: read from minimum source DNs required for reconstruction.
-      // The returned success list is the source DNs we do real read from
-      stripedReader.readMinimumSources(toReconstructLen);
-
-      // step2: decode to reconstruct targets
-      reconstructTargets(toReconstructLen);
-
-      // step3: transfer data
-      if (stripedWriter.transferData2Targets() == 0) {
-        String error = "Transfer failed for all targets.";
-        throw new IOException(error);
-      }
-
-      positionInBlock += toReconstructLen;
-
-      clearBuffers();
-    }
-  }
-
   // Initialize decoder
-  private void initDecoderIfNecessary() {
+  protected void initDecoderIfNecessary() {
     if (decoder == null) {
       ErasureCoderOptions coderOptions = new ErasureCoderOptions(
           ecPolicy.getNumDataUnits(), ecPolicy.getNumParityUnits());
@@ -223,32 +162,10 @@ class StripedReconstructor implements Runnable {
     }
   }
 
-  private void reconstructTargets(int toReconstructLen) {
-    initDecoderIfNecessary();
-
-    ByteBuffer[] inputs = stripedReader.getInputBuffers(toReconstructLen);
-
-    int[] erasedIndices = stripedWriter.getRealTargetIndices();
-    ByteBuffer[] outputs = stripedWriter.getRealTargetBuffers(toReconstructLen);
-
-    decoder.decode(inputs, erasedIndices, outputs);
-
-    stripedWriter.updateRealTargetBuffers(toReconstructLen);
-  }
-
   long getPositionInBlock() {
     return positionInBlock;
   }
 
-  /**
-   * Clear all associated buffers.
-   */
-  private void clearBuffers() {
-    stripedReader.clearBuffers();
-
-    stripedWriter.clearBuffers();
-  }
-
   InetSocketAddress getSocketAddress4Transfer(DatanodeInfo dnInfo) {
     return NetUtils.createSocketAddr(dnInfo.getXferAddr(
         datanode.getDnConf().getConnectToDnViaHostname()));
@@ -258,7 +175,7 @@ class StripedReconstructor implements Runnable {
     return stripedReader.getBufferSize();
   }
 
-  DataChecksum getChecksum() {
+  public DataChecksum getChecksum() {
     return stripedReader.getChecksum();
   }
 
@@ -267,10 +184,42 @@ class StripedReconstructor implements Runnable {
   }
 
   CompletionService<Void> createReadService() {
-    return new ExecutorCompletionService<>(worker.getStripedReadPool());
+    return new ExecutorCompletionService<>(stripedReadPool);
   }
 
   ExtendedBlock getBlockGroup() {
     return blockGroup;
   }
+
+  BitSet getLiveBitSet() {
+    return liveBitSet;
+  }
+
+  long getMaxTargetLength() {
+    return maxTargetLength;
+  }
+
+  void setMaxTargetLength(long maxTargetLength) {
+    this.maxTargetLength = maxTargetLength;
+  }
+
+  void updatePositionInBlock(long positionInBlockArg) {
+    this.positionInBlock += positionInBlockArg;
+  }
+
+  RawErasureDecoder getDecoder() {
+    return decoder;
+  }
+
+  StripedReader getStripedReader() {
+    return stripedReader;
+  }
+
+  Configuration getConf() {
+    return conf;
+  }
+
+  DataNode getDatanode() {
+    return datanode;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d749cf65/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedWriter.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedWriter.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedWriter.java
index e2052a3..ca7a3a8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedWriter.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedWriter.java
@@ -22,11 +22,9 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
 import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
 import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
-import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo;
 import org.apache.hadoop.util.DataChecksum;
 import org.slf4j.Logger;
 
@@ -57,7 +55,6 @@ class StripedWriter {
   private final short[] targetIndices;
   private boolean hasValidTargets;
   private final StorageType[] targetStorageTypes;
-  private long maxTargetLength;
 
   private StripedBlockWriter[] writers;
 
@@ -67,20 +64,19 @@ class StripedWriter {
   private int bytesPerChecksum;
   private int checksumSize;
 
-  StripedWriter(StripedReconstructor reconstructor,
-                DataNode datanode,
-                Configuration conf,
-                BlockECReconstructionInfo reconstructionInfo) {
+  StripedWriter(StripedReconstructor reconstructor, DataNode datanode,
+      Configuration conf, StripedReconstructionInfo stripedReconInfo) {
     this.reconstructor = reconstructor;
     this.datanode = datanode;
     this.conf = conf;
 
-    ErasureCodingPolicy ecPolicy = reconstructionInfo.getErasureCodingPolicy();
-    dataBlkNum = ecPolicy.getNumDataUnits();
-    parityBlkNum = ecPolicy.getNumParityUnits();
+    dataBlkNum = stripedReconInfo.getEcPolicy().getNumDataUnits();
+    parityBlkNum = stripedReconInfo.getEcPolicy().getNumParityUnits();
 
-    targets = reconstructionInfo.getTargetDnInfos();
-    targetStorageTypes = reconstructionInfo.getTargetStorageTypes();
+    this.targets = stripedReconInfo.getTargets();
+    assert targets != null;
+    this.targetStorageTypes = stripedReconInfo.getTargetStorageTypes();
+    assert targetStorageTypes != null;
 
     writers = new StripedBlockWriter[targets.length];
 
@@ -88,12 +84,12 @@ class StripedWriter {
     Preconditions.checkArgument(targetIndices.length <= parityBlkNum,
         "Too much missed striped blocks.");
     initTargetIndices();
-
-    maxTargetLength = 0L;
+    long maxTargetLength = 0L;
     for (short targetIndex : targetIndices) {
       maxTargetLength = Math.max(maxTargetLength,
           reconstructor.getBlockLen(targetIndex));
     }
+    reconstructor.setMaxTargetLength(maxTargetLength);
 
     // targetsStatus store whether some target is success, it will record
     // any failed target once, if some target failed (invalid DN or transfer
@@ -126,7 +122,6 @@ class StripedWriter {
     BitSet bitset = reconstructor.getLiveBitSet();
 
     int m = 0;
-    int k = 0;
     hasValidTargets = false;
     for (int i = 0; i < dataBlkNum + parityBlkNum; i++) {
       if (!bitset.get(i)) {
@@ -257,10 +252,6 @@ class StripedWriter {
     }
   }
 
-  long getMaxTargetLength() {
-    return maxTargetLength;
-  }
-
   byte[] getChecksumBuf() {
     return checksumBuf;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d749cf65/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommissionWithStriped.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommissionWithStriped.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommissionWithStriped.java
index 598e76f..d223354 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommissionWithStriped.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommissionWithStriped.java
@@ -36,6 +36,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileChecksum;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSClient;
@@ -276,6 +277,52 @@ public class TestDecommissionWithStriped {
     cleanupFile(dfs, ecFile);
   }
 
+  /**
+   * Tests to verify that the file checksum should be able to compute after the
+   * decommission operation.
+   *
+   * Below is the block indices list after the decommission. ' represents
+   * decommissioned node index.
+   *
+   * 0, 2, 3, 4, 5, 6, 7, 8, 1, 1'
+   *
+   * Here, this list contains duplicated blocks and does not maintaining any
+   * order.
+   */
+  @Test(timeout = 120000)
+  public void testFileChecksumAfterDecommission() throws Exception {
+    LOG.info("Starting test testFileChecksumAfterDecommission");
+
+    final Path ecFile = new Path(ecDir, "testFileChecksumAfterDecommission");
+    int writeBytes = BLOCK_STRIPED_CELL_SIZE * NUM_DATA_BLOCKS;
+    writeStripedFile(dfs, ecFile, writeBytes);
+    Assert.assertEquals(0, bm.numOfUnderReplicatedBlocks());
+    FileChecksum fileChecksum1 = dfs.getFileChecksum(ecFile, writeBytes);
+
+    final List<DatanodeInfo> decommisionNodes = new ArrayList<DatanodeInfo>();
+    LocatedBlock lb = dfs.getClient().getLocatedBlocks(ecFile.toString(), 0)
+        .get(0);
+    DatanodeInfo[] dnLocs = lb.getLocations();
+    assertEquals(NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS, dnLocs.length);
+    int decommNodeIndex = 1;
+
+    // add the node which will be decommissioning
+    decommisionNodes.add(dnLocs[decommNodeIndex]);
+    decommissionNode(0, decommisionNodes, AdminStates.DECOMMISSIONED);
+    assertEquals(decommisionNodes.size(), fsn.getNumDecomLiveDataNodes());
+    assertNull(checkFile(dfs, ecFile, 9, decommisionNodes, numDNs));
+    StripedFileTestUtil.checkData(dfs, ecFile, writeBytes, decommisionNodes,
+        null);
+
+    // verify checksum
+    FileChecksum fileChecksum2 = dfs.getFileChecksum(ecFile, writeBytes);
+    LOG.info("fileChecksum1:" + fileChecksum1);
+    LOG.info("fileChecksum2:" + fileChecksum2);
+
+    Assert.assertTrue("Checksum mismatches!",
+        fileChecksum1.equals(fileChecksum2));
+  }
+
   private void testDecommission(int writeBytes, int storageCount,
       int decomNodeCount, String filename) throws IOException, Exception {
     Path ecFile = new Path(ecDir, filename);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d749cf65/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileChecksum.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileChecksum.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileChecksum.java
index 7cee344..3bee6be 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileChecksum.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileChecksum.java
@@ -163,17 +163,40 @@ public class TestFileChecksum {
     Assert.assertFalse(stripedFileChecksum1.equals(replicatedFileChecksum));
   }
 
-  /*
-  // TODO: allow datanode failure, HDFS-9833
   @Test
-  public void testStripedAndReplicatedWithFailure() throws Exception {
-    FileChecksum stripedFileChecksum1 = getFileChecksum(stripedFile1,
-        10, true);
-    FileChecksum replicatedFileChecksum = getFileChecksum(replicatedFile,
-        10, true);
+  public void testStripedFileChecksumWithMissedDataBlocks1() throws Exception {
+    FileChecksum stripedFileChecksum1 = getFileChecksum(stripedFile1, fileSize,
+        false);
+    FileChecksum stripedFileChecksumRecon = getFileChecksum(stripedFile1,
+        fileSize, true);
 
-    Assert.assertFalse(stripedFileChecksum1.equals(replicatedFileChecksum));
-  }*/
+    LOG.info("stripedFileChecksum1:" + stripedFileChecksum1);
+    LOG.info("stripedFileChecksumRecon:" + stripedFileChecksumRecon);
+
+    Assert.assertTrue("Checksum mismatches!",
+        stripedFileChecksum1.equals(stripedFileChecksumRecon));
+  }
+
+  @Test
+  public void testStripedFileChecksumWithMissedDataBlocks2() throws Exception {
+    FileChecksum stripedFileChecksum1 = getFileChecksum(stripedFile1, -1,
+        false);
+    FileChecksum stripedFileChecksum2 = getFileChecksum(stripedFile2, -1,
+        false);
+    FileChecksum stripedFileChecksum2Recon = getFileChecksum(stripedFile2, -1,
+        true);
+
+    LOG.info("stripedFileChecksum1:" + stripedFileChecksum1);
+    LOG.info("stripedFileChecksum2:" + stripedFileChecksum1);
+    LOG.info("stripedFileChecksum2Recon:" + stripedFileChecksum2Recon);
+
+    Assert.assertTrue("Checksum mismatches!",
+        stripedFileChecksum1.equals(stripedFileChecksum2));
+    Assert.assertTrue("Checksum mismatches!",
+        stripedFileChecksum1.equals(stripedFileChecksum2Recon));
+    Assert.assertTrue("Checksum mismatches!",
+        stripedFileChecksum2.equals(stripedFileChecksum2Recon));
+  }
 
   private FileChecksum getFileChecksum(String filePath, int range,
                                        boolean killDn) throws Exception {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org