You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zh...@apache.org on 2015/05/26 21:34:39 UTC

[09/50] [abbrv] hadoop git commit: HDFS-7348. Erasure Coding: DataNode reconstruct striped blocks. Contributed by Yi Liu.

HDFS-7348. Erasure Coding: DataNode reconstruct striped blocks. Contributed by Yi Liu.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6616de24
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6616de24
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6616de24

Branch: refs/heads/HDFS-7285
Commit: 6616de24cb14f1c2d0d6568fd4382062618834bd
Parents: 220ca96
Author: Zhe Zhang <zh...@apache.org>
Authored: Tue May 5 16:33:56 2015 -0700
Committer: Zhe Zhang <zh...@cloudera.com>
Committed: Tue May 26 12:01:47 2015 -0700

----------------------------------------------------------------------
 .../hadoop-hdfs/CHANGES-HDFS-EC-7285.txt        |   3 +
 .../org/apache/hadoop/hdfs/BlockReader.java     |   6 +
 .../apache/hadoop/hdfs/BlockReaderLocal.java    |   5 +
 .../hadoop/hdfs/BlockReaderLocalLegacy.java     |   5 +
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |   6 +
 .../java/org/apache/hadoop/hdfs/DFSPacket.java  |  10 +-
 .../apache/hadoop/hdfs/RemoteBlockReader.java   |   5 +
 .../apache/hadoop/hdfs/RemoteBlockReader2.java  |   5 +
 .../hadoop/hdfs/server/datanode/DNConf.java     |  27 +
 .../hadoop/hdfs/server/datanode/DataNode.java   |  31 +-
 .../erasurecode/ErasureCodingWorker.java        | 893 ++++++++++++++++++-
 .../hadoop/hdfs/util/StripedBlockUtil.java      |  49 +-
 .../src/main/resources/hdfs-default.xml         |  31 +-
 .../hadoop/hdfs/TestRecoverStripedFile.java     | 356 ++++++++
 14 files changed, 1377 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/6616de24/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
index 7efaa5a..0d2d448 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
@@ -175,3 +175,6 @@
     
     HDFS-7672. Handle write failure for stripping blocks and refactor the
     existing code in DFSStripedOutputStream and StripedDataStreamer.  (szetszwo)
+
+    HDFS-7348. Erasure Coding: DataNode reconstruct striped blocks. 
+    (Yi Liu via Zhe Zhang)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6616de24/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
index aa3e8ba..0a5511e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.ByteBufferReadable;
 import org.apache.hadoop.fs.ReadOption;
 import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
+import org.apache.hadoop.util.DataChecksum;
 
 /**
  * A BlockReader is responsible for reading a single block
@@ -99,4 +100,9 @@ public interface BlockReader extends ByteBufferReadable {
    *                      supported.
    */
   ClientMmap getClientMmap(EnumSet<ReadOption> opts);
+
+  /**
+   * @return              The DataChecksum used by the read block
+   */
+  DataChecksum getDataChecksum();
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6616de24/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
index d913f3a..0b2420d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
@@ -738,4 +738,9 @@ class BlockReaderLocal implements BlockReader {
   void forceUnanchorable() {
     replica.getSlot().makeUnanchorable();
   }
+
+  @Override
+  public DataChecksum getDataChecksum() {
+    return checksum;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6616de24/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
index c16ffdf..04cf733 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
@@ -732,4 +732,9 @@ class BlockReaderLocalLegacy implements BlockReader {
   public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
     return null;
   }
+
+  @Override
+  public DataChecksum getDataChecksum() {
+    return checksum;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6616de24/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 6bc005b..d5d3095 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -369,6 +369,12 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final int     DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT = 21600;
   public static final String  DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY = "dfs.datanode.directoryscan.threads";
   public static final int     DFS_DATANODE_DIRECTORYSCAN_THREADS_DEFAULT = 1;
+  public static final String  DFS_DATANODE_STRIPED_READ_THREADS_KEY = "dfs.datanode.stripedread.threads";
+  public static final int     DFS_DATANODE_STRIPED_READ_THREADS_DEFAULT = 20;
+  public static final String  DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_KEY = "dfs.datanode.stripedread.buffer.size";
+  public static final int     DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_DEFAULT = 256 * 1024;
+  public static final String  DFS_DATANODE_STRIPED_READ_THRESHOLD_MILLIS_KEY = "dfs.datanode.stripedread.threshold.millis";
+  public static final int     DFS_DATANODE_STRIPED_READ_THRESHOLD_MILLIS_DEFAULT = 5000; //5s 
   public static final String  DFS_DATANODE_DNS_INTERFACE_KEY = "dfs.datanode.dns.interface";
   public static final String  DFS_DATANODE_DNS_INTERFACE_DEFAULT = "default";
   public static final String  DFS_DATANODE_DNS_NAMESERVER_KEY = "dfs.datanode.dns.nameserver";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6616de24/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java
index 9cd1ec1..a26e35e 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java
@@ -37,7 +37,7 @@ import org.apache.htrace.Span;
  ****************************************************************/
 
 @InterfaceAudience.Private
-class DFSPacket {
+public class DFSPacket {
   public static final long HEART_BEAT_SEQNO = -1L;
   private static long[] EMPTY = new long[0];
   private final long seqno; // sequence number of buffer in block
@@ -80,7 +80,7 @@ class DFSPacket {
    * @param checksumSize the size of checksum
    * @param lastPacketInBlock if this is the last packet
    */
-  DFSPacket(byte[] buf, int chunksPerPkt, long offsetInBlock, long seqno,
+  public DFSPacket(byte[] buf, int chunksPerPkt, long offsetInBlock, long seqno,
                    int checksumSize, boolean lastPacketInBlock) {
     this.lastPacketInBlock = lastPacketInBlock;
     this.numChunks = 0;
@@ -114,7 +114,7 @@ class DFSPacket {
     dataPos += len;
   }
 
-  synchronized void writeData(ByteBuffer inBuffer, int len)
+  public synchronized void writeData(ByteBuffer inBuffer, int len)
       throws ClosedChannelException {
     checkBuffer();
     len =  len > inBuffer.remaining() ? inBuffer.remaining() : len;
@@ -135,7 +135,7 @@ class DFSPacket {
    * @param len the length of checksums to write
    * @throws ClosedChannelException
    */
-  synchronized void writeChecksum(byte[] inarray, int off, int len)
+  public synchronized void writeChecksum(byte[] inarray, int off, int len)
       throws ClosedChannelException {
     checkBuffer();
     if (len == 0) {
@@ -154,7 +154,7 @@ class DFSPacket {
    * @param stm
    * @throws IOException
    */
-  synchronized void writeTo(DataOutputStream stm) throws IOException {
+  public synchronized void writeTo(DataOutputStream stm) throws IOException {
     checkBuffer();
 
     final int dataLen = dataPos - dataStart;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6616de24/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
index d70f419..70cce7e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
@@ -505,4 +505,9 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
   public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
     return null;
   }
+
+  @Override
+  public DataChecksum getDataChecksum() {
+    return checksum;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6616de24/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
index c368d65..cce44b7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
@@ -474,4 +474,9 @@ public class RemoteBlockReader2  implements BlockReader {
   public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
     return null;
   }
+
+  @Override
+  public DataChecksum getDataChecksum() {
+    return checksum;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6616de24/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
index 4b7fbc3..d25642f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
@@ -236,6 +236,33 @@ public class DNConf {
   }
 
   /**
+   * Returns true if connect to datanode via hostname
+   * 
+   * @return boolean true if connect to datanode via hostname
+   */
+  public boolean getConnectToDnViaHostname() {
+    return connectToDnViaHostname;
+  }
+
+  /**
+   * Returns socket timeout
+   * 
+   * @return int socket timeout
+   */
+  public int getSocketTimeout() {
+    return socketTimeout;
+  }
+
+  /**
+   * Returns socket write timeout
+   * 
+   * @return int socket write timeout
+   */
+  public int getSocketWriteTimeout() {
+    return socketWriteTimeout;
+  }
+
+  /**
    * Returns the SaslPropertiesResolver configured for use with
    * DataTransferProtocol, or null if not configured.
    *

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6616de24/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 221ba38..5eca2c7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -1161,7 +1161,8 @@ public class DataNode extends ReconfigurableBase
     saslClient = new SaslDataTransferClient(dnConf.conf, 
         dnConf.saslPropsResolver, dnConf.trustedChannelResolver);
     saslServer = new SaslDataTransferServer(dnConf, blockPoolTokenSecretManager);
-    ecWorker = new ErasureCodingWorker(conf); // Initialize ErasureCoding worker
+    // Initialize ErasureCoding worker
+    ecWorker = new ErasureCodingWorker(conf, this);
   }
 
   /**
@@ -1226,6 +1227,10 @@ public class DataNode extends ReconfigurableBase
     return UUID.randomUUID().toString();
   }
 
+  public SaslDataTransferClient getSaslClient() {
+    return saslClient;
+  }
+
   /**
    * Verify that the DatanodeUuid has been initialized. If this is a new
    * datanode then we generate a new Datanode Uuid and persist it to disk.
@@ -1488,7 +1493,7 @@ public class DataNode extends ReconfigurableBase
   /**
    * Creates either NIO or regular depending on socketWriteTimeout.
    */
-  protected Socket newSocket() throws IOException {
+  public Socket newSocket() throws IOException {
     return (dnConf.socketWriteTimeout > 0) ? 
            SocketChannel.open().socket() : new Socket();                                   
   }
@@ -2143,11 +2148,8 @@ public class DataNode extends ReconfigurableBase
         //
         // Header info
         //
-        Token<BlockTokenIdentifier> accessToken = BlockTokenSecretManager.DUMMY_TOKEN;
-        if (isBlockTokenEnabled) {
-          accessToken = blockPoolTokenSecretManager.generateToken(b, 
-              EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE));
-        }
+        Token<BlockTokenIdentifier> accessToken = getBlockAccessToken(b, 
+            EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE));
 
         long writeTimeout = dnConf.socketWriteTimeout + 
                             HdfsServerConstants.WRITE_TIMEOUT_EXTENSION * (targets.length-1);
@@ -2214,6 +2216,19 @@ public class DataNode extends ReconfigurableBase
     }
   }
 
+  /***
+   * Use BlockTokenSecretManager to generate block token for current user.
+   */
+  public Token<BlockTokenIdentifier> getBlockAccessToken(ExtendedBlock b,
+      EnumSet<AccessMode> mode) throws IOException {
+    Token<BlockTokenIdentifier> accessToken = 
+        BlockTokenSecretManager.DUMMY_TOKEN;
+    if (isBlockTokenEnabled) {
+      accessToken = blockPoolTokenSecretManager.generateToken(b, mode);
+    }
+    return accessToken;
+  }
+
   /**
    * Returns a new DataEncryptionKeyFactory that generates a key from the
    * BlockPoolTokenSecretManager, using the block pool ID of the given block.
@@ -2221,7 +2236,7 @@ public class DataNode extends ReconfigurableBase
    * @param block for which the factory needs to create a key
    * @return DataEncryptionKeyFactory for block's block pool ID
    */
-  DataEncryptionKeyFactory getDataEncryptionKeyFactoryForBlock(
+  public DataEncryptionKeyFactory getDataEncryptionKeyFactoryForBlock(
       final ExtendedBlock block) {
     return new DataEncryptionKeyFactory() {
       @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6616de24/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
index 6430308..c4e568f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
@@ -17,15 +17,68 @@
  */
 package org.apache.hadoop.hdfs.server.datanode.erasurecode;
 
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.BitSet;
 import java.util.Collection;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.Future;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.commons.logging.Log;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.BlockReader;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSPacket;
+import org.apache.hadoop.hdfs.RemoteBlockReader2;
+import org.apache.hadoop.hdfs.net.Peer;
+import org.apache.hadoop.hdfs.net.TcpPeerServer;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
+import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo;
-import org.apache.hadoop.io.erasurecode.coder.AbstractErasureCoder;
+import org.apache.hadoop.hdfs.util.StripedBlockUtil;
+import org.apache.hadoop.hdfs.util.StripedBlockUtil.StripedReadResult;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.erasurecode.ECSchema;
 import org.apache.hadoop.io.erasurecode.rawcoder.RSRawDecoder;
 import org.apache.hadoop.io.erasurecode.rawcoder.RSRawEncoder;
-import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureCoder;
+import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
+import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.DataChecksum;
+
+import com.google.common.base.Preconditions;
 
 /**
  * ErasureCodingWorker handles the erasure coding recovery work commands. These
@@ -34,41 +87,60 @@ import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureCoder;
  * commands.
  */
 public final class ErasureCodingWorker {
-
+  private final Log LOG = DataNode.LOG;
+  
+  private final DataNode datanode; 
   private Configuration conf;
-  RawErasureCoder rawEncoder = null;
-  RawErasureCoder rawDecoder = null;
 
-  public ErasureCodingWorker(Configuration conf) {
+  private ThreadPoolExecutor STRIPED_READ_TRHEAD_POOL;
+  private final int STRIPED_READ_THRESHOLD_MILLIS;
+  private final int STRIPED_READ_BUFFER_SIZE;
+
+  public ErasureCodingWorker(Configuration conf, DataNode datanode) {
+    this.datanode = datanode;
     this.conf = conf;
-    initialize();
-  }
 
-  /**
-   * Initializes the required resources for handling the erasure coding recovery
-   * work.
-   */
-  public void initialize() {
-    // Right now directly used RS coder. Once other coders integration ready, we
-    // can load preferred codec here.
-    initializeErasureEncoder();
-    initializeErasureDecoder();
+    STRIPED_READ_THRESHOLD_MILLIS = conf.getInt(
+        DFSConfigKeys.DFS_DATANODE_STRIPED_READ_THRESHOLD_MILLIS_KEY,
+        DFSConfigKeys.DFS_DATANODE_STRIPED_READ_THRESHOLD_MILLIS_DEFAULT);
+    initializeStripedReadThreadPool(conf.getInt(
+        DFSConfigKeys.DFS_DATANODE_STRIPED_READ_THREADS_KEY, 
+        DFSConfigKeys.DFS_DATANODE_STRIPED_READ_THREADS_DEFAULT));
+    STRIPED_READ_BUFFER_SIZE = conf.getInt(
+        DFSConfigKeys.DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_KEY,
+        DFSConfigKeys.DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_DEFAULT);
   }
 
-  private void initializeErasureDecoder() {
-    rawDecoder = AbstractErasureCoder.createRawCoder(conf,
-        CommonConfigurationKeys.IO_ERASURECODE_CODEC_RS_RAWCODER_KEY, false);
-    if (rawDecoder == null) {
-      rawDecoder = new RSRawDecoder();
-    }
+  private RawErasureEncoder newEncoder() {
+    return new RSRawEncoder();
+  }
+  
+  private RawErasureDecoder newDecoder() {
+    return new RSRawDecoder();
   }
 
-  private void initializeErasureEncoder() {
-    rawEncoder = AbstractErasureCoder.createRawCoder(conf,
-        CommonConfigurationKeys.IO_ERASURECODE_CODEC_RS_RAWCODER_KEY, true);
-    if (rawEncoder == null) {
-      rawEncoder = new RSRawEncoder();
-    }
+  private void initializeStripedReadThreadPool(int num) {
+    STRIPED_READ_TRHEAD_POOL = new ThreadPoolExecutor(1, num, 60,
+        TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
+        new Daemon.DaemonFactory() {
+      private final AtomicInteger threadIndex = new AtomicInteger(0);
+
+      @Override
+      public Thread newThread(Runnable r) {
+        Thread t = super.newThread(r);
+        t.setName("stripedRead-" + threadIndex.getAndIncrement());
+        return t;
+      }
+    }, new ThreadPoolExecutor.CallerRunsPolicy() {
+      @Override
+      public void rejectedExecution(Runnable runnable, ThreadPoolExecutor e) {
+        LOG.info("Execution for striped reading rejected, "
+            + "Executing in current thread");
+        // will run in the current thread
+        super.rejectedExecution(runnable, e);
+      }
+    });
+    STRIPED_READ_TRHEAD_POOL.allowCoreThreadTimeOut(true);
   }
 
   /**
@@ -78,6 +150,765 @@ public final class ErasureCodingWorker {
    *          BlockECRecoveryInfo
    */
   public void processErasureCodingTasks(Collection<BlockECRecoveryInfo> ecTasks) {
-    // HDFS-7348 : Implement the actual recovery process
+    for (BlockECRecoveryInfo recoveryInfo : ecTasks) {
+      try {
+        new Daemon(new ReconstructAndTransferBlock(recoveryInfo)).start();
+      } catch (Throwable e) {
+        LOG.warn("Failed to recover striped block " + 
+            recoveryInfo.getExtendedBlock().getLocalBlock(), e);
+      }
+    }
+  }
+
+  /**
+   * ReconstructAndTransferBlock recover one or more missed striped block in the
+   * striped block group, the minimum number of live striped blocks should be
+   * no less than data block number.
+   * 
+   * | <- Striped Block Group -> |
+   *  blk_0      blk_1       blk_2(*)   blk_3   ...   <- A striped block group
+   *    |          |           |          |  
+   *    v          v           v          v 
+   * +------+   +------+   +------+   +------+
+   * |cell_0|   |cell_1|   |cell_2|   |cell_3|  ...    
+   * +------+   +------+   +------+   +------+     
+   * |cell_4|   |cell_5|   |cell_6|   |cell_7|  ...
+   * +------+   +------+   +------+   +------+
+   * |cell_8|   |cell_9|   |cell10|   |cell11|  ...
+   * +------+   +------+   +------+   +------+
+   *  ...         ...       ...         ...
+   *  
+   * 
+   * We use following steps to recover striped block group, in each round, we
+   * recover <code>bufferSize</code> data until finish, the 
+   * <code>bufferSize</code> is configurable and may be less or larger than 
+   * cell size:
+   * step1: read <code>bufferSize</code> data from minimum number of sources 
+   *        required by recovery.
+   * step2: decode data for targets.
+   * step3: transfer data to targets.
+   * 
+   * In step1, try to read <code>bufferSize</code> data from minimum number
+   * of sources , if there is corrupt or stale sources, read from new source
+   * will be scheduled. The best sources are remembered for next round and 
+   * may be updated in each round.
+   * 
+   * In step2, typically if source blocks we read are all data blocks, we 
+   * need to call encode, and if there is one parity block, we need to call
+   * decode. Notice we only read once and recover all missed striped block 
+   * if they are more than one.
+   * 
+   * In step3, send the recovered data to targets by constructing packet 
+   * and send them directly. Same as continuous block replication, we 
+   * don't check the packet ack. Since the datanode doing the recovery work
+   * are one of the source datanodes, so the recovered data are sent 
+   * remotely.
+   * 
+   * There are some points we can do further improvements in next phase:
+   * 1. we can read the block file directly on the local datanode, 
+   *    currently we use remote block reader. (Notice short-circuit is not
+   *    a good choice, see inline comments).
+   * 2. We need to check the packet ack for EC recovery? Since EC recovery
+   *    is more expensive than continuous block replication, it needs to 
+   *    read from several other datanodes, should we make sure the 
+   *    recovered result received by targets? 
+   */
+  private class ReconstructAndTransferBlock implements Runnable {
+    private final int dataBlkNum;
+    private final int parityBlkNum;
+    private final int cellSize;
+    
+    private RawErasureEncoder encoder;
+    private RawErasureDecoder decoder;
+
+    // Striped read buffer size
+    private int bufferSize;
+
+    private final ExtendedBlock blockGroup;
+    // position in striped block
+    private long positionInBlock;
+
+    // sources
+    private final short[] liveIndices;
+    private DatanodeInfo[] sources;
+
+    private List<StripedReader> stripedReaders;
+
+    // targets
+    private DatanodeInfo[] targets;
+    private StorageType[] targetStorageTypes;
+
+    private short[] targetIndices;
+    private ByteBuffer[] targetBuffers;
+
+    private Socket[] targetSockets;
+    private DataOutputStream[] targetOutputStreams;
+    private DataInputStream[] targetInputStreams;
+
+    private long[] blockOffset4Targets;
+    private long[] seqNo4Targets;
+
+    private final int WRITE_PACKET_SIZE = 64 * 1024;
+    private DataChecksum checksum;
+    private int maxChunksPerPacket;
+    private byte[] packetBuf;
+    private byte[] checksumBuf;
+    private int bytesPerChecksum;
+    private int checksumSize;
+
+    private CachingStrategy cachingStrategy;
+
+    private Map<Future<Void>, Integer> futures = new HashMap<>();
+    private CompletionService<Void> readService =
+        new ExecutorCompletionService<>(STRIPED_READ_TRHEAD_POOL);
+
+    ReconstructAndTransferBlock(BlockECRecoveryInfo recoveryInfo) {
+      ECSchema schema = recoveryInfo.getECSchema();
+      dataBlkNum = schema.getNumDataUnits();
+      parityBlkNum = schema.getNumParityUnits();
+      cellSize = schema.getChunkSize();
+
+      blockGroup = recoveryInfo.getExtendedBlock();
+
+      liveIndices = recoveryInfo.getLiveBlockIndices();
+      sources = recoveryInfo.getSourceDnInfos();
+      stripedReaders = new ArrayList<>(sources.length);
+
+      Preconditions.checkArgument(liveIndices.length >= dataBlkNum,
+          "No enough live striped blocks.");
+      Preconditions.checkArgument(liveIndices.length == sources.length);
+
+      targets = recoveryInfo.getTargetDnInfos();
+      targetStorageTypes = recoveryInfo.getTargetStorageTypes();
+      targetIndices = new short[targets.length];
+      targetBuffers = new ByteBuffer[targets.length];
+
+      targetSockets = new Socket[targets.length];
+      targetOutputStreams = new DataOutputStream[targets.length];
+      targetInputStreams = new DataInputStream[targets.length];
+
+      blockOffset4Targets = new long[targets.length];
+      seqNo4Targets = new long[targets.length];
+
+      for (int i = 0; i < targets.length; i++) {
+        blockOffset4Targets[i] = 0;
+        seqNo4Targets[i] = 0;
+      }
+
+      getTargetIndices();
+      cachingStrategy = CachingStrategy.newDefaultStrategy();
+    }
+
+    private ExtendedBlock getBlock(ExtendedBlock blockGroup, int i) {
+      return StripedBlockUtil.constructStripedBlock(blockGroup, cellSize,
+          dataBlkNum, i);
+    }
+
+    private long getBlockLen(ExtendedBlock blockGroup, int i) { 
+      return StripedBlockUtil.getStripedBlockLength(blockGroup.getNumBytes(),
+          cellSize, dataBlkNum, i);
+    }
+
+    @Override
+    public void run() {
+      try {
+        // Store the indices of successfully read source
+        // This will be updated after doing real read.
+        int[] success = new int[dataBlkNum];
+
+        int nsuccess = 0;
+        for (int i = 0; i < sources.length && nsuccess < dataBlkNum; i++) {
+          StripedReader reader = new StripedReader(liveIndices[i]);
+          stripedReaders.add(reader);
+
+          BlockReader blockReader = newBlockReader(
+              getBlock(blockGroup, liveIndices[i]), 0, sources[i]);
+          if (blockReader != null) {
+            initChecksumAndBufferSizeIfNeeded(blockReader);
+            reader.blockReader = blockReader;
+            reader.buffer = ByteBuffer.allocate(bufferSize);
+            success[nsuccess++] = i;
+          }
+        }
+
+        if (nsuccess < dataBlkNum) {
+          String error = "Can't find minimum sources required by "
+              + "recovery, block id: " + blockGroup.getBlockId();
+          LOG.warn(error);
+          throw new IOException(error);
+        }
+
+        for (int i = 0; i < targets.length; i++) {
+          targetBuffers[i] = ByteBuffer.allocate(bufferSize);
+        }
+
+        checksumSize = checksum.getChecksumSize();
+        int chunkSize = bytesPerChecksum + checksumSize;
+        maxChunksPerPacket = Math.max(
+            (WRITE_PACKET_SIZE - PacketHeader.PKT_MAX_HEADER_LEN)/chunkSize, 1);
+        int maxPacketSize = chunkSize * maxChunksPerPacket 
+            + PacketHeader.PKT_MAX_HEADER_LEN;
+
+        packetBuf = new byte[maxPacketSize];
+        checksumBuf = new byte[checksumSize * (bufferSize / bytesPerChecksum)];
+
+        // Store whether the target is success
+        boolean[] targetsStatus = new boolean[targets.length];
+        if (initTargetStreams(targetsStatus) == 0) {
+          String error = "All targets are failed.";
+          LOG.warn(error);
+          throw new IOException(error);
+        }
+
+        long firstStripedBlockLength = getBlockLen(blockGroup, 0);
+        while (positionInBlock < firstStripedBlockLength) {
+          int toRead = Math.min(
+              bufferSize, (int)(firstStripedBlockLength - positionInBlock));
+          // step1: read minimum striped buffer size data required by recovery.
+          nsuccess = readMinimumStripedData4Recovery(success);
+
+          if (nsuccess < dataBlkNum) {
+            String error = "Can't read data from minimum number of sources "
+                + "required by recovery, block id: " + blockGroup.getBlockId();
+            LOG.warn(error);
+            throw new IOException(error);
+          }
+
+          // step2: encode/decode to recover targets
+          long remaining = firstStripedBlockLength - positionInBlock;
+          int toRecoverLen = remaining < bufferSize ? 
+              (int)remaining : bufferSize;
+          recoverTargets(success, targetsStatus, toRecoverLen);
+
+          // step3: transfer data
+          if (transferData2Targets(targetsStatus) == 0) {
+            String error = "Transfer failed for all targets.";
+            LOG.warn(error);
+            throw new IOException(error);
+          }
+
+          clearBuffers();
+          positionInBlock += toRead;
+        }
+
+        endTargetBlocks(targetsStatus);
+
+        // Currently we don't check the acks for packets, this is similar as
+        // block replication.
+      } catch (Throwable e) {
+        LOG.warn("Failed to recover striped block: " + blockGroup);
+      } finally {
+        // close block readers
+        for (StripedReader stripedReader : stripedReaders) {
+          closeBlockReader(stripedReader.blockReader);
+        }
+        for (int i = 0; i < targets.length; i++) {
+          IOUtils.closeStream(targetOutputStreams[i]);
+          IOUtils.closeStream(targetInputStreams[i]);
+          IOUtils.closeStream(targetSockets[i]);
+        }
+      }
+    }
+
+    // init checksum from block reader
+    private void initChecksumAndBufferSizeIfNeeded(BlockReader blockReader) {
+      if (checksum == null) {
+        checksum = blockReader.getDataChecksum();
+        bytesPerChecksum = checksum.getBytesPerChecksum();
+        // The bufferSize is flat to divide bytesPerChecksum
+        int readBufferSize = STRIPED_READ_BUFFER_SIZE;
+        bufferSize = readBufferSize < bytesPerChecksum ? bytesPerChecksum :
+          readBufferSize - readBufferSize % bytesPerChecksum;
+      } else {
+        assert blockReader.getDataChecksum().equals(checksum);
+      }
+    }
+
+    // assume liveIndices is not ordered.
+    private void getTargetIndices() {
+      BitSet bitset = new BitSet(dataBlkNum + parityBlkNum);
+      for (int i = 0; i < sources.length; i++) {
+        bitset.set(liveIndices[i]);
+      }
+      int m = 0;
+      for (int i = 0; i < dataBlkNum + parityBlkNum && m < targets.length; i++) {
+        if (!bitset.get(i)) {
+          targetIndices[m++] = (short)i;
+        }
+      }
+    }
+
+    /**
+     * Read minimum striped buffer size data required by recovery.
+     * <code>success</code> list will be updated after read.
+     * 
+     * Initially we only read from <code>dataBlkNum</code> sources, 
+     * if timeout or failure for some source, we will try to schedule 
+     * read from a new source. 
+     */
+    private int readMinimumStripedData4Recovery(int[] success) {
+
+      BitSet used = new BitSet(sources.length);
+      for (int i = 0; i < dataBlkNum; i++) {
+        StripedReader reader = stripedReaders.get(success[i]);
+        Callable<Void> readCallable = readFromBlock(
+            reader.blockReader, reader.buffer);
+        Future<Void> f = readService.submit(readCallable);
+        futures.put(f, success[i]);
+        used.set(success[i]);
+      }
+
+      int nsuccess = 0;
+      while (!futures.isEmpty()) {
+        try {
+          StripedReadResult result = 
+              StripedBlockUtil.getNextCompletedStripedRead(
+                  readService, futures, STRIPED_READ_THRESHOLD_MILLIS);
+          if (result.state == StripedReadResult.SUCCESSFUL) {
+            success[nsuccess++] = result.index;
+            if (nsuccess >= dataBlkNum) {
+              // cancel remaining reads if we read successfully from minimum
+              // number of sources required for recovery.
+              cancelReads(futures.keySet());
+              futures.clear();
+              break;
+            }
+          } else if (result.state == StripedReadResult.FAILED) {
+            // If read failed for some source, we should not use it anymore 
+            // and schedule read from a new source.
+            StripedReader failedReader = stripedReaders.get(result.index);
+            closeBlockReader(failedReader.blockReader);
+            failedReader.blockReader = null;
+            scheduleNewRead(used);
+          } else if (result.state == StripedReadResult.TIMEOUT) {
+            // If timeout, we also schedule a new read.
+            scheduleNewRead(used);
+          }
+        } catch (InterruptedException e) {
+          LOG.info("Read data interrupted.", e);
+          break;
+        }
+      }
+
+      return nsuccess;
+    }
+
+    /**
+     * Return true if need to do encoding to recovery missed striped block.
+     */
+    private boolean shouldEncode(int[] success) {
+      for (int i = 0; i < success.length; i++) {
+        if (stripedReaders.get(success[i]).index >= dataBlkNum) {
+          return false;
+        }
+      }
+      return true;
+    }
+    
+    private void paddingBufferToLen(ByteBuffer buffer, int len) {
+      int toPadding = len - buffer.position();
+      for (int i = 0; i < toPadding; i++) {
+        buffer.put((byte) 0);
+      }
+    }
+    
+    // Initialize encoder
+    private void initEncoderIfNecessary() {
+      if (encoder == null) {
+        encoder = newEncoder();
+        encoder.initialize(dataBlkNum, parityBlkNum, bufferSize);
+      }
+    }
+    
+    // Initialize decoder
+    private void initDecoderIfNecessary() {
+      if (decoder == null) {
+        decoder = newDecoder();
+        decoder.initialize(dataBlkNum, parityBlkNum, bufferSize);
+      }
+    }
+
+    private void recoverTargets(int[] success, boolean[] targetsStatus,
+        int toRecoverLen) {
+      if (shouldEncode(success)) {
+        initEncoderIfNecessary();
+        ByteBuffer[] dataBuffers = new ByteBuffer[dataBlkNum];
+        ByteBuffer[] parityBuffers = new ByteBuffer[parityBlkNum];
+        for (int i = 0; i < dataBlkNum; i++) {
+          StripedReader reader = stripedReaders.get(i);
+          ByteBuffer buffer = reader.buffer;
+          paddingBufferToLen(buffer, toRecoverLen);
+          dataBuffers[i] = (ByteBuffer)buffer.flip();
+        }
+        for (int i = dataBlkNum; i < stripedReaders.size(); i++) {
+          StripedReader reader = stripedReaders.get(i);
+          parityBuffers[reader.index - dataBlkNum] = cleanBuffer(reader.buffer);
+        }
+        for (int i = 0; i < targets.length; i++) {
+          parityBuffers[targetIndices[i] - dataBlkNum] = targetBuffers[i];
+        }
+        for (int i = 0; i < parityBlkNum; i++) {
+          if (parityBuffers[i] == null) {
+            parityBuffers[i] = ByteBuffer.allocate(toRecoverLen);
+          } else {
+            parityBuffers[i].limit(toRecoverLen);
+          }
+        }
+        encoder.encode(dataBuffers, parityBuffers);
+      } else {
+        /////////// TODO: wait for HADOOP-11847 /////////////
+        ////////// The current decode method always try to decode parityBlkNum number of data blocks. ////////////
+        initDecoderIfNecessary();
+        ByteBuffer[] inputs = new ByteBuffer[dataBlkNum + parityBlkNum];
+        for (int i = 0; i < success.length; i++) {
+          StripedReader reader = stripedReaders.get(success[i]);
+          ByteBuffer buffer = reader.buffer;
+          paddingBufferToLen(buffer, toRecoverLen);
+          int index = reader.index < dataBlkNum ? 
+              reader.index + parityBlkNum : reader.index - dataBlkNum;
+          inputs[index] = (ByteBuffer)buffer.flip();
+        }
+        int[] indices4Decode = new int[parityBlkNum];
+        int m = 0;
+        for (int i = 0; i < dataBlkNum + parityBlkNum; i++) {
+          if (inputs[i] == null) {
+            inputs[i] = ByteBuffer.allocate(toRecoverLen);
+            indices4Decode[m++] = i;
+          }
+        }
+        ByteBuffer[] outputs = new ByteBuffer[parityBlkNum];
+        m = 0;
+        // targetIndices is subset of indices4Decode
+        for (int i = 0; i < parityBlkNum; i++) {
+          if (m < targetIndices.length && 
+              (indices4Decode[i] - parityBlkNum) == targetIndices[m]) {
+            outputs[i] = targetBuffers[m++];
+            outputs[i].limit(toRecoverLen);
+          } else {
+            outputs[i] = ByteBuffer.allocate(toRecoverLen);
+          }
+        }
+        
+        decoder.decode(inputs, indices4Decode, outputs);
+        
+        for (int i = 0; i < targets.length; i++) {
+          if (targetsStatus[i]) {
+            long blockLen = getBlockLen(blockGroup, targetIndices[i]);
+            long remaining = blockLen - positionInBlock;
+            if (remaining < 0) {
+              targetBuffers[i].limit(0);
+            } else if (remaining < toRecoverLen) {
+              targetBuffers[i].limit((int)remaining);
+            }
+          }
+        }
+      }
+    }
+
+    /** 
+     * Schedule read from a new source, we first try un-initial source, 
+     * then try un-used source in this round and bypass failed source.
+     */
+    private void scheduleNewRead(BitSet used) {
+      StripedReader reader = null;
+      int m = stripedReaders.size();
+      while (m < sources.length && reader == null) {
+        reader = new StripedReader(liveIndices[m]);
+        BlockReader blockReader = newBlockReader(
+            getBlock(blockGroup, liveIndices[m]), positionInBlock, sources[m]);
+        stripedReaders.add(reader);
+        if (blockReader != null) {
+          assert blockReader.getDataChecksum().equals(checksum);
+          reader.blockReader = blockReader;
+          reader.buffer = ByteBuffer.allocate(bufferSize);
+        } else {
+          m++;
+          reader = null;
+        }
+      }
+
+      for (int i = 0; reader == null && i < stripedReaders.size(); i++) {
+        StripedReader r = stripedReaders.get(i);
+        if (r.blockReader != null && !used.get(i)) {
+          closeBlockReader(r.blockReader);
+          r.blockReader = newBlockReader(
+              getBlock(blockGroup, liveIndices[i]), positionInBlock,
+              sources[i]);
+          if (r.blockReader != null) {
+            m = i;
+            reader = r;
+          }
+        }
+      }
+
+      if (reader != null) {
+        Callable<Void> readCallable = readFromBlock(
+            reader.blockReader, reader.buffer);
+        Future<Void> f = readService.submit(readCallable);
+        futures.put(f, m);
+        used.set(m);
+      }
+    }
+
+    // cancel all reads.
+    private void cancelReads(Collection<Future<Void>> futures) {
+      for (Future<Void> future : futures) {
+        future.cancel(true);
+      }
+    }
+
+    private Callable<Void> readFromBlock(final BlockReader reader,
+        final ByteBuffer buf) {
+      return new Callable<Void>() {
+
+        @Override
+        public Void call() throws Exception {
+          try {
+            actualReadFromBlock(reader, buf);
+            return null;
+          } catch (IOException e) {
+            LOG.info(e.getMessage());
+            throw e;
+          }
+        }
+
+      };
+    }
+
+    /**
+     * Read bytes from block
+     */
+    private void actualReadFromBlock(BlockReader reader, ByteBuffer buf)
+        throws IOException {
+      int len = buf.remaining();
+      int n = 0;
+      while (n < len) {
+        int nread = reader.read(buf);
+        if (nread <= 0) {
+          break;
+        }
+        n += nread;
+      }
+    }
+
+    // close block reader
+    private void closeBlockReader(BlockReader blockReader) {
+      try {
+        if (blockReader != null) {
+          blockReader.close();
+        }
+      } catch (IOException e) {
+        // ignore
+      }
+    }
+
+    private InetSocketAddress getSocketAddress4Transfer(DatanodeInfo dnInfo) {
+      return NetUtils.createSocketAddr(dnInfo.getXferAddr(
+          datanode.getDnConf().getConnectToDnViaHostname()));
+    }
+
+    private BlockReader newBlockReader(final ExtendedBlock block, 
+        long startOffset, DatanodeInfo dnInfo) {
+      try {
+        InetSocketAddress dnAddr = getSocketAddress4Transfer(dnInfo);
+        Token<BlockTokenIdentifier> blockToken = datanode.getBlockAccessToken(
+            block, EnumSet.of(BlockTokenIdentifier.AccessMode.READ));
+        /*
+         * This can be further improved if the replica is local, then we can
+         * read directly from DN and need to check the replica is FINALIZED
+         * state, notice we should not use short-circuit local read which
+         * requires config for domain-socket in UNIX or legacy config in Windows.
+         */
+        return RemoteBlockReader2.newBlockReader(
+            "dummy", block, blockToken, startOffset, block.getNumBytes(), true,
+            "", newConnectedPeer(block, dnAddr, blockToken, dnInfo), dnInfo,
+            null, cachingStrategy);
+      } catch (IOException e) {
+        return null;
+      }
+    }
+
+    private Peer newConnectedPeer(ExtendedBlock b, InetSocketAddress addr,
+        Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId)
+        throws IOException {
+      Peer peer = null;
+      boolean success = false;
+      Socket sock = null;
+      final int socketTimeout = datanode.getDnConf().getSocketTimeout(); 
+      try {
+        sock = NetUtils.getDefaultSocketFactory(conf).createSocket();
+        NetUtils.connect(sock, addr, socketTimeout);
+        peer = TcpPeerServer.peerFromSocketAndKey(datanode.getSaslClient(), 
+            sock, datanode.getDataEncryptionKeyFactoryForBlock(b),
+            blockToken, datanodeId);
+        peer.setReadTimeout(socketTimeout);
+        success = true;
+        return peer;
+      } finally {
+        if (!success) {
+          IOUtils.cleanup(LOG, peer);
+          IOUtils.closeSocket(sock);
+        }
+      }
+    }
+
+    /**
+     * Send data to targets
+     */
+    private int transferData2Targets(boolean[] targetsStatus) {
+      int nsuccess = 0;
+      for (int i = 0; i < targets.length; i++) {
+        if (targetsStatus[i]) {
+          boolean success = false;
+          try {
+            ByteBuffer buffer = targetBuffers[i];
+            
+            if (buffer.remaining() == 0) {
+              continue;
+            }
+
+            checksum.calculateChunkedSums(
+                buffer.array(), 0, buffer.remaining(), checksumBuf, 0);
+
+            int ckOff = 0;
+            while (buffer.remaining() > 0) {
+              DFSPacket packet = new DFSPacket(packetBuf, maxChunksPerPacket,
+                  blockOffset4Targets[i], seqNo4Targets[i]++, checksumSize, false);
+              int maxBytesToPacket = maxChunksPerPacket * bytesPerChecksum;
+              int toWrite = buffer.remaining() > maxBytesToPacket ?
+                  maxBytesToPacket : buffer.remaining();
+              int ckLen = ((toWrite - 1) / bytesPerChecksum + 1) * checksumSize;
+              packet.writeChecksum(checksumBuf, ckOff, ckLen);
+              ckOff += ckLen;
+              packet.writeData(buffer, toWrite);
+
+              // Send packet
+              packet.writeTo(targetOutputStreams[i]);
+
+              blockOffset4Targets[i] += toWrite;
+              nsuccess++;
+              success = true;
+            }
+          } catch (IOException e) {
+            LOG.warn(e.getMessage());
+          }
+          targetsStatus[i] = success;
+        }
+      }
+      return nsuccess;
+    }
+
+    /**
+     * clear all buffers
+     */
+    private void clearBuffers() {
+      for (StripedReader stripedReader : stripedReaders) {
+        if (stripedReader.buffer != null) {
+          stripedReader.buffer.clear();
+        }
+      }
+
+      for (int i = 0; i < targetBuffers.length; i++) {
+        if (targetBuffers[i] != null) {
+          cleanBuffer(targetBuffers[i]);
+        }
+      }
+    }
+    
+    private ByteBuffer cleanBuffer(ByteBuffer buffer) {
+      Arrays.fill(buffer.array(), (byte) 0);
+      return (ByteBuffer)buffer.clear();
+    }
+
+    // send an empty packet to mark the end of the block
+    private void endTargetBlocks(boolean[] targetsStatus) {
+      for (int i = 0; i < targets.length; i++) {
+        if (targetsStatus[i]) {
+          try {
+            DFSPacket packet = new DFSPacket(packetBuf, 0, 
+                blockOffset4Targets[i], seqNo4Targets[i]++, checksumSize, true);
+            packet.writeTo(targetOutputStreams[i]);
+            targetOutputStreams[i].flush();
+          } catch (IOException e) {
+            LOG.warn(e.getMessage());
+          }
+        }
+      }
+    }
+
+    /**
+     * Initialize  output/input streams for transferring data to target
+     * and send create block request. 
+     */
+    private int initTargetStreams(boolean[] targetsStatus) {
+      int nsuccess = 0;
+      for (int i = 0; i < targets.length; i++) {
+        Socket socket = null;
+        DataOutputStream out = null;
+        DataInputStream in = null;
+        boolean success = false;
+        try {
+          InetSocketAddress targetAddr = 
+              getSocketAddress4Transfer(targets[i]);
+          socket = datanode.newSocket();
+          NetUtils.connect(socket, targetAddr, 
+              datanode.getDnConf().getSocketTimeout());
+          socket.setSoTimeout(datanode.getDnConf().getSocketTimeout());
+
+          ExtendedBlock block = getBlock(blockGroup, targetIndices[i]);
+          Token<BlockTokenIdentifier> blockToken = 
+              datanode.getBlockAccessToken(block,
+                  EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE));
+
+          long writeTimeout = datanode.getDnConf().getSocketWriteTimeout();
+          OutputStream unbufOut = NetUtils.getOutputStream(socket, writeTimeout);
+          InputStream unbufIn = NetUtils.getInputStream(socket);
+          DataEncryptionKeyFactory keyFactory =
+            datanode.getDataEncryptionKeyFactoryForBlock(block);
+          IOStreamPair saslStreams = datanode.getSaslClient().socketSend(
+              socket, unbufOut, unbufIn, keyFactory, blockToken, targets[i]);
+
+          unbufOut = saslStreams.out;
+          unbufIn = saslStreams.in;
+
+          out = new DataOutputStream(new BufferedOutputStream(unbufOut,
+              HdfsServerConstants.SMALL_BUFFER_SIZE));
+          in = new DataInputStream(unbufIn);
+
+          DatanodeInfo source = new DatanodeInfo(datanode.getDatanodeId());
+          new Sender(out).writeBlock(block, targetStorageTypes[i], 
+              blockToken, "", new DatanodeInfo[]{targets[i]}, 
+              new StorageType[]{targetStorageTypes[i]}, source, 
+              BlockConstructionStage.PIPELINE_SETUP_CREATE, 0, 0, 0, 0, 
+              checksum, cachingStrategy, false, false, null);
+
+          targetSockets[i] = socket;
+          targetOutputStreams[i] = out;
+          targetInputStreams[i] = in;
+          nsuccess++;
+          success = true;
+        } catch (Throwable e) {
+          LOG.warn(e.getMessage());
+        } finally {
+          if (!success) {
+            IOUtils.closeStream(out);
+            IOUtils.closeStream(in);
+            IOUtils.closeStream(socket);
+          }
+        }
+        targetsStatus[i] = success;
+      }
+      return nsuccess;
+    }
+  }
+
+  private class StripedReader {
+    short index;
+    BlockReader blockReader;
+    ByteBuffer buffer;
+
+    public StripedReader(short index) {
+      this.index = index;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6616de24/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
index 24d4bfb..45bbf6b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hdfs.util;
 
 import com.google.common.annotations.VisibleForTesting;
+
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -77,10 +78,8 @@ public class StripedBlockUtil {
   public static LocatedBlock constructInternalBlock(LocatedStripedBlock bg,
       int idxInReturnedLocs, int cellSize, int dataBlkNum,
       int idxInBlockGroup) {
-    final ExtendedBlock blk = new ExtendedBlock(bg.getBlock());
-    blk.setBlockId(bg.getBlock().getBlockId() + idxInBlockGroup);
-    blk.setNumBytes(getInternalBlockLength(bg.getBlockSize(),
-        cellSize, dataBlkNum, idxInBlockGroup));
+    final ExtendedBlock blk = constructInternalBlock(
+        bg.getBlock(), cellSize, dataBlkNum, idxInBlockGroup);
 
     return new LocatedBlock(blk,
         new DatanodeInfo[]{bg.getLocations()[idxInReturnedLocs]},
@@ -91,6 +90,44 @@ public class StripedBlockUtil {
   }
 
   /**
+   * This method creates an internal {@link ExtendedBlock} at the given index
+   * of a block group.
+   */
+  public static ExtendedBlock constructInternalBlock(ExtendedBlock blockGroup,
+      int cellSize, int dataBlkNum, int idxInBlockGroup) {
+    ExtendedBlock block = new ExtendedBlock(blockGroup);
+    block.setBlockId(blockGroup.getBlockId() + idxInBlockGroup);
+    block.setNumBytes(getInternalBlockLength(blockGroup.getNumBytes(),
+        cellSize, dataBlkNum, idxInBlockGroup));
+    return block;
+  }
+  
+  /**
+   * This method creates an internal {@link ExtendedBlock} at the given index
+   * of a block group, for both data and parity block.
+   */
+  public static ExtendedBlock constructStripedBlock(ExtendedBlock blockGroup,
+      int cellSize, int dataBlkNum, int idxInBlockGroup) {
+    ExtendedBlock block = new ExtendedBlock(blockGroup);
+    block.setBlockId(blockGroup.getBlockId() + idxInBlockGroup);
+    block.setNumBytes(getStripedBlockLength(blockGroup.getNumBytes(), cellSize,
+        dataBlkNum, idxInBlockGroup));
+    return block;
+  }
+
+  /**
+   * Returns an internal block length at the given index of a block group,
+   * for both data and parity block.
+   */
+  public static long getStripedBlockLength(long numBytes, int cellSize,
+      int dataBlkNum, int idxInBlockGroup) {
+    // parity block length is the same as the first striped block length. 
+    return StripedBlockUtil.getInternalBlockLength(
+        numBytes, cellSize, dataBlkNum, 
+        idxInBlockGroup < dataBlkNum ? idxInBlockGroup : 0);
+  }
+
+  /**
    * Get the size of an internal block at the given index of a block group
    *
    * @param dataSize Size of the block group only counting data blocks
@@ -208,8 +245,8 @@ public class StripedBlockUtil {
    * @throws InterruptedException
    */
   public static StripedReadResult getNextCompletedStripedRead(
-      CompletionService<Void> readService, Map<Future<Void>,
-      Integer> futures, final long threshold) throws InterruptedException {
+      CompletionService<Void> readService, Map<Future<Void>, Integer> futures,
+      final long threshold) throws InterruptedException {
     Preconditions.checkArgument(!futures.isEmpty());
     Preconditions.checkArgument(threshold > 0);
     Future<Void> future = null;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6616de24/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 7f0730b..f802128 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -2312,11 +2312,11 @@
   </description>
 </property>
 
-  <property>
-    <name>dfs.datanode.block-pinning.enabled</name>
-    <value>false</value>
-    <description>Whether pin blocks on favored DataNode.</description>
-  </property>
+<property>
+  <name>dfs.datanode.block-pinning.enabled</name>
+  <value>false</value>
+  <description>Whether pin blocks on favored DataNode.</description>
+</property>
 
 <property>
   <name>dfs.client.block.write.locateFollowingBlock.initial.delay.ms</name>
@@ -2354,4 +2354,25 @@
   </description>
 </property>
 
+<property>
+  <name>dfs.datanode.stripedread.threshold.millis</name>
+  <value>5000</value>
+  <description>datanode striped read threshold in millisecond.
+  </description>
+</property>
+
+<property>
+  <name>dfs.datanode.stripedread.threads</name>
+  <value>20</value>
+  <description>datanode striped read thread pool size.
+  </description>
+</property>
+
+<property>
+  <name>dfs.datanode.stripedread.buffer.size</name>
+  <value>262144</value>
+  <description>datanode striped read buffer size.
+  </description>
+</property>
+
 </configuration>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6616de24/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRecoverStripedFile.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRecoverStripedFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRecoverStripedFile.java
new file mode 100644
index 0000000..b4f05d4
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRecoverStripedFile.java
@@ -0,0 +1,356 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
+import org.apache.hadoop.hdfs.util.StripedBlockUtil;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.BitSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class TestRecoverStripedFile {
+  public static final Log LOG = LogFactory.getLog(TestRecoverStripedFile.class);
+  
+  private static final int dataBlkNum = HdfsConstants.NUM_DATA_BLOCKS;
+  private static final int parityBlkNum = HdfsConstants.NUM_PARITY_BLOCKS;
+  private static final int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
+  private static final int blockSize = cellSize * 3;
+  private static final int groupSize = dataBlkNum + parityBlkNum;
+  private static final int dnNum = groupSize + parityBlkNum;
+  
+  private MiniDFSCluster cluster;
+  private Configuration conf;
+  private DistributedFileSystem fs;
+  // Map: DatanodeID -> datanode index in cluster
+  private Map<DatanodeID, Integer> dnMap = new HashMap<DatanodeID, Integer>();
+
+  @Before
+  public void setup() throws IOException {
+    conf = new Configuration();
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
+    conf.setInt(DFSConfigKeys.DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_KEY, cellSize - 1);
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1);
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(dnNum).build();;
+    cluster.waitActive();
+    
+    fs = cluster.getFileSystem();
+    fs.getClient().createErasureCodingZone("/", null);
+
+    List<DataNode> datanodes = cluster.getDataNodes();
+    for (int i = 0; i < dnNum; i++) {
+      dnMap.put(datanodes.get(i).getDatanodeId(), i);
+    }
+  }
+
+  @After
+  public void tearDown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+  
+  @Test(timeout = 120000)
+  public void testRecoverOneParityBlock() throws Exception {
+    int fileLen = 10 * blockSize + blockSize/10;
+    assertFileBlocksRecovery("/testRecoverOneParityBlock", fileLen, 0, 1);
+  }
+  
+  @Test(timeout = 120000)
+  public void testRecoverThreeParityBlocks() throws Exception {
+    int fileLen = 3 * blockSize + blockSize/10;
+    assertFileBlocksRecovery("/testRecoverThreeParityBlocks", fileLen, 0, 3);
+  }
+  
+  @Test(timeout = 120000)
+  public void testRecoverThreeDataBlocks() throws Exception {
+    int fileLen = 3 * blockSize + blockSize/10;
+    assertFileBlocksRecovery("/testRecoverThreeDataBlocks", fileLen, 1, 3);
+  }
+  
+  @Test(timeout = 120000)
+  public void testRecoverOneDataBlock() throws Exception {
+    ////TODO: TODO: wait for HADOOP-11847
+    //int fileLen = 10 * blockSize + blockSize/10;
+    //assertFileBlocksRecovery("/testRecoverOneDataBlock", fileLen, 1, 1);
+  }
+  
+  @Test(timeout = 120000)
+  public void testRecoverAnyBlocks() throws Exception {
+    ////TODO: TODO: wait for HADOOP-11847
+    //int fileLen = 3 * blockSize + blockSize/10;
+    //assertFileBlocksRecovery("/testRecoverAnyBlocks", fileLen, 2, 2);
+  }
+  
+  /**
+   * Test the file blocks recovery.
+   * 1. Check the replica is recovered in the target datanode, 
+   *    and verify the block replica length, generationStamp and content.
+   * 2. Read the file and verify content. 
+   */
+  private void assertFileBlocksRecovery(String fileName, int fileLen,
+      int recovery, int toRecoverBlockNum) throws Exception {
+    if (recovery != 0 && recovery != 1 && recovery != 2) {
+      Assert.fail("Invalid recovery: 0 is to recovery parity blocks,"
+          + "1 is to recovery data blocks, 2 is any.");
+    }
+    if (toRecoverBlockNum < 1 || toRecoverBlockNum > parityBlkNum) {
+      Assert.fail("toRecoverBlockNum should be between 1 ~ " + parityBlkNum);
+    }
+    
+    Path file = new Path(fileName);
+    
+    testCreateStripedFile(file, fileLen);
+    
+    LocatedBlocks locatedBlocks = getLocatedBlocks(file);
+    assertEquals(locatedBlocks.getFileLength(), fileLen);
+    
+    LocatedStripedBlock lastBlock = 
+        (LocatedStripedBlock)locatedBlocks.getLastLocatedBlock();
+    
+    DatanodeInfo[] storageInfos = lastBlock.getLocations();
+    int[] indices = lastBlock.getBlockIndices();
+    
+    BitSet bitset = new BitSet(dnNum);
+    for (DatanodeInfo storageInfo : storageInfos) {
+      bitset.set(dnMap.get(storageInfo));
+    }
+    
+    int[] toDead = new int[toRecoverBlockNum];
+    int n = 0;
+    for (int i = 0; i < indices.length; i++) {
+      if (n < toRecoverBlockNum) {
+        if (recovery == 0) {
+          if (indices[i] >= dataBlkNum) {
+            toDead[n++] = i;
+          }
+        } else if (recovery == 1) {
+          if (indices[i] < dataBlkNum) {
+            toDead[n++] = i;
+          }
+        } else {
+          toDead[n++] = i;
+        }
+      } else {
+        break;
+      }
+    }
+    
+    DatanodeInfo[] dataDNs = new DatanodeInfo[toRecoverBlockNum];
+    int[] deadDnIndices = new int[toRecoverBlockNum];
+    ExtendedBlock[] blocks = new ExtendedBlock[toRecoverBlockNum];
+    File[] replicas = new File[toRecoverBlockNum];
+    File[] metadatas = new File[toRecoverBlockNum];
+    byte[][] replicaContents = new byte[toRecoverBlockNum][];
+    for (int i = 0; i < toRecoverBlockNum; i++) {
+      dataDNs[i] = storageInfos[toDead[i]];
+      deadDnIndices[i] = dnMap.get(dataDNs[i]);
+      
+      // Check the block replica file on deadDn before it dead.
+      blocks[i] = StripedBlockUtil.constructStripedBlock(
+          lastBlock.getBlock(), cellSize, dataBlkNum, indices[toDead[i]]);
+      replicas[i] = cluster.getBlockFile(deadDnIndices[i], blocks[i]);
+      metadatas[i] = cluster.getBlockMetadataFile(deadDnIndices[i], blocks[i]);
+      // the block replica on the datanode should be the same as expected
+      assertEquals(replicas[i].length(), 
+          StripedBlockUtil.getStripedBlockLength(
+          lastBlock.getBlockSize(), cellSize, dataBlkNum, indices[toDead[i]]));
+      assertTrue(metadatas[i].getName().
+          endsWith(blocks[i].getGenerationStamp() + ".meta"));
+      replicaContents[i] = readReplica(replicas[i]);
+    }
+    
+    try {
+      DatanodeID[] dnIDs = new DatanodeID[toRecoverBlockNum];
+      for (int i = 0; i < toRecoverBlockNum; i++) {
+        /*
+         * Kill the datanode which contains one replica
+         * We need to make sure it dead in namenode: clear its update time and 
+         * trigger NN to check heartbeat.
+         */
+        DataNode dn = cluster.getDataNodes().get(deadDnIndices[i]);
+        dn.shutdown();
+        dnIDs[i] = dn.getDatanodeId();
+      }
+      setDataNodesDead(dnIDs);
+       
+      
+      // Check the locatedBlocks of the file again
+      locatedBlocks = getLocatedBlocks(file);
+      lastBlock = (LocatedStripedBlock)locatedBlocks.getLastLocatedBlock();
+      storageInfos = lastBlock.getLocations();
+      assertEquals(storageInfos.length, groupSize - toRecoverBlockNum);
+      
+      int[] targetDNs = new int[dnNum - groupSize];
+      n = 0;
+      for (int i = 0; i < dnNum; i++) {
+        if (!bitset.get(i)) { // not contain replica of the block.
+          targetDNs[n++] = i;
+        }
+      }
+      
+      waitForRecoveryFinished(file);
+      
+      targetDNs = sortTargetsByReplicas(blocks, targetDNs);
+      
+      // Check the replica on the new target node.
+      for (int i = 0; i < toRecoverBlockNum; i++) {
+        File replicaAfterRecovery = cluster.getBlockFile(targetDNs[i], blocks[i]);
+        File metadataAfterRecovery = 
+            cluster.getBlockMetadataFile(targetDNs[i], blocks[i]);
+        assertEquals(replicaAfterRecovery.length(), replicas[i].length());
+        assertTrue(metadataAfterRecovery.getName().
+            endsWith(blocks[i].getGenerationStamp() + ".meta"));
+        byte[] replicaContentAfterRecovery = readReplica(replicaAfterRecovery);
+        
+        Assert.assertArrayEquals(replicaContents[i], replicaContentAfterRecovery);
+      }
+    } finally {
+      for (int i = 0; i < toRecoverBlockNum; i++) {
+        restartDataNode(toDead[i]);
+      }
+      cluster.waitActive();
+    }
+    fs.delete(file, true);
+  }
+  
+  private void setDataNodesDead(DatanodeID[] dnIDs) throws IOException {
+    for (DatanodeID dn : dnIDs) {
+      DatanodeDescriptor dnd =
+          NameNodeAdapter.getDatanode(cluster.getNamesystem(), dn);
+      DFSTestUtil.setDatanodeDead(dnd);
+    }
+    
+    BlockManagerTestUtil.checkHeartbeat(cluster.getNamesystem().getBlockManager());
+  }
+  
+  private void restartDataNode(int dn) {
+    try {
+      cluster.restartDataNode(dn, true, true);
+    } catch (IOException e) {
+    }
+  }
+  
+  private int[] sortTargetsByReplicas(ExtendedBlock[] blocks, int[] targetDNs) {
+    int[] result = new int[blocks.length];
+    for (int i = 0; i < blocks.length; i++) {
+      result[i] = -1;
+      for (int j = 0; j < targetDNs.length; j++) {
+        if (targetDNs[j] != -1) {
+          File replica = cluster.getBlockFile(targetDNs[j], blocks[i]);
+          if (replica != null) {
+            result[i] = targetDNs[j];
+            targetDNs[j] = -1;
+            break;
+          }
+        }
+      }
+      if (result[i] == -1) {
+        Assert.fail("Failed to recover striped block: " + blocks[i].getBlockId());
+      }
+    }
+    return result;
+  }
+  
+  private byte[] readReplica(File replica) throws IOException {
+    int length = (int)replica.length();
+    ByteArrayOutputStream content = new ByteArrayOutputStream(length);
+    FileInputStream in = new FileInputStream(replica);
+    try {
+      byte[] buffer = new byte[1024];
+      int total = 0;
+      while (total < length) {
+        int n = in.read(buffer);
+        if (n <= 0) {
+          break;
+        }
+        content.write(buffer, 0, n);
+        total += n;
+      }
+      if (total < length) {
+        Assert.fail("Failed to read all content of replica");
+      }
+      return content.toByteArray();
+    } finally {
+      in.close();
+    }
+  }
+  
+  private LocatedBlocks waitForRecoveryFinished(Path file) throws Exception {
+    final int ATTEMPTS = 60;
+    for (int i = 0; i < ATTEMPTS; i++) {
+      LocatedBlocks locatedBlocks = getLocatedBlocks(file);
+      LocatedStripedBlock lastBlock = 
+          (LocatedStripedBlock)locatedBlocks.getLastLocatedBlock();
+      DatanodeInfo[] storageInfos = lastBlock.getLocations();
+      if (storageInfos.length >= groupSize) {
+        return locatedBlocks;
+      }
+      Thread.sleep(1000);
+    }
+    throw new IOException ("Time out waiting for EC block recovery.");
+  }
+  
+  private LocatedBlocks getLocatedBlocks(Path file) throws IOException {
+    return fs.getClient().getLocatedBlocks(file.toString(), 0, Long.MAX_VALUE);
+  }
+  
+  private void testCreateStripedFile(Path file, int dataLen)
+      throws IOException {
+    final byte[] data = new byte[dataLen];
+    DFSUtil.getRandom().nextBytes(data);
+    writeContents(file, data);
+  }
+  
+  void writeContents(Path file, byte[] contents)
+      throws IOException {
+    FSDataOutputStream out = fs.create(file);
+    try {
+      out.write(contents, 0, contents.length);
+    } finally {
+      out.close();
+    }
+  }
+}