You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zh...@apache.org on 2015/12/19 00:58:02 UTC

hadoop git commit: HDFS-9173. Erasure Coding: Lease recovery for striped file. Contributed by Walter Su and Jing Zhao.

Repository: hadoop
Updated Branches:
  refs/heads/trunk 85c246604 -> 61ab0440f


HDFS-9173. Erasure Coding: Lease recovery for striped file. Contributed by Walter Su and Jing Zhao.

Change-Id: I51703a61c9d8454f883028f3f6acb5729fde1b15


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/61ab0440
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/61ab0440
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/61ab0440

Branch: refs/heads/trunk
Commit: 61ab0440f7eaff0f631cbae0378403912f88d7ad
Parents: 85c2466
Author: Zhe Zhang <zh...@apache.org>
Authored: Fri Dec 18 15:57:48 2015 -0800
Committer: Zhe Zhang <zh...@apache.org>
Committed: Fri Dec 18 15:57:48 2015 -0800

----------------------------------------------------------------------
 .../hadoop/hdfs/DFSStripedOutputStream.java     |   4 +-
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 .../apache/hadoop/hdfs/protocolPB/PBHelper.java |  37 ++-
 .../server/blockmanagement/DatanodeManager.java |  12 +-
 .../server/datanode/BlockRecoveryWorker.java    | 238 ++++++++++++++++-
 .../hdfs/server/namenode/FSNamesystem.java      |  58 ++---
 .../server/protocol/BlockRecoveryCommand.java   |  33 +++
 .../hadoop-hdfs/src/main/proto/HdfsServer.proto |   6 +-
 .../apache/hadoop/hdfs/StripedFileTestUtil.java |  10 +-
 .../hadoop/hdfs/TestLeaseRecoveryStriped.java   | 257 +++++++++++++++++++
 .../hdfs/server/datanode/TestBlockRecovery.java |  34 ++-
 .../TestCommitBlockSynchronization.java         |   5 +-
 .../namenode/TestRecoverStripedBlocks.java      |   5 +-
 13 files changed, 634 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/61ab0440/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
index 546fa92..e1ff844 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
@@ -35,6 +35,7 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.CreateFlag;
@@ -970,7 +971,8 @@ public class DFSStripedOutputStream extends DFSOutputStream {
     }
   }
 
-  private void enqueueAllCurrentPackets() throws IOException {
+  @VisibleForTesting
+  void enqueueAllCurrentPackets() throws IOException {
     int idx = streamers.indexOf(getCurrentStreamer());
     for(int i = 0; i < streamers.size(); i++) {
       final StripedDataStreamer si = setCurrentStreamer(i);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/61ab0440/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 76d5c24..2286203 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -873,6 +873,9 @@ Trunk (Unreleased)
       HDFS-9373. Erasure coding: friendly log information for write operations
       with some failed streamers. (Li Bo via zhz)
 
+      HDFS-9173. Erasure Coding: Lease recovery for striped file. (Walter Su and
+      Jing Zhao via zhz)
+
       HDFS-9451. Clean up depreated umasks and related unit tests.
       (Wei-Chiu Chuang via wheat9)
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/61ab0440/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
index 9271e33..d7a793a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
@@ -87,6 +87,7 @@ import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringStripedBlock;
 import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
@@ -359,6 +360,12 @@ public class PBHelper {
     builder.setBlock(lb).setNewGenStamp(b.getNewGenerationStamp());
     if(b.getNewBlock() != null)
       builder.setTruncateBlock(PBHelperClient.convert(b.getNewBlock()));
+    if (b instanceof RecoveringStripedBlock) {
+      RecoveringStripedBlock sb = (RecoveringStripedBlock) b;
+      builder.setEcPolicy(PBHelperClient.convertErasureCodingPolicy(
+          sb.getErasureCodingPolicy()));
+      builder.addAllBlockIndices(asList(sb.getBlockIndices()));
+    }
     return builder.build();
   }
 
@@ -372,6 +379,16 @@ public class PBHelper {
       rBlock = new RecoveringBlock(lb.getBlock(), lb.getLocations(),
           b.getNewGenStamp());
     }
+
+    if (b.hasEcPolicy()) {
+      List<Integer> BlockIndicesList = b.getBlockIndicesList();
+      int[] indices = new int[BlockIndicesList.size()];
+      for (int i = 0; i < BlockIndicesList.size(); i++) {
+        indices[i] = BlockIndicesList.get(i).shortValue();
+      }
+      rBlock = new RecoveringStripedBlock(rBlock, indices,
+          PBHelperClient.convertErasureCodingPolicy(b.getEcPolicy()));
+    }
     return rBlock;
   }
 
@@ -823,12 +840,20 @@ public class PBHelper {
         build();
   }
 
-  private static List<Integer> convertIntArray(short[] liveBlockIndices) {
-    List<Integer> liveBlockIndicesList = new ArrayList<>();
-    for (short s : liveBlockIndices) {
-      liveBlockIndicesList.add((int) s);
+  private static List<Integer> asList(int[] arr) {
+    List<Integer> list = new ArrayList<>(arr.length);
+    for (int s : arr) {
+      list.add(s);
+    }
+    return list;
+  }
+
+  private static List<Integer> asList(short[] arr) {
+    List<Integer> list = new ArrayList<>(arr.length);
+    for (int s : arr) {
+      list.add(s);
     }
-    return liveBlockIndicesList;
+    return list;
   }
 
   private static StorageTypesProto convertStorageTypesProto(
@@ -925,7 +950,7 @@ public class PBHelper {
     builder.setTargetStorageTypes(convertStorageTypesProto(targetStorageTypes));
 
     short[] liveBlockIndices = blockEcRecoveryInfo.getLiveBlockIndices();
-    builder.addAllLiveBlockIndices(convertIntArray(liveBlockIndices));
+    builder.addAllLiveBlockIndices(asList(liveBlockIndices));
 
     builder.setEcPolicy(PBHelperClient.convertErasureCodingPolicy(
         blockEcRecoveryInfo.getErasureCodingPolicy()));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/61ab0440/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
index cdd5b9e..0c0c01a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.hdfs.server.namenode.Namesystem;
 import org.apache.hadoop.hdfs.server.protocol.*;
 import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringStripedBlock;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.net.*;
 import org.apache.hadoop.net.NetworkTopology.InvalidTopologyException;
@@ -504,6 +505,7 @@ public class DatanodeManager {
   public DatanodeStorageInfo[] getDatanodeStorageInfos(
       DatanodeID[] datanodeID, String[] storageIDs,
       String format, Object... args) throws UnregisteredNodeException {
+    storageIDs = storageIDs == null ? new String[0] : storageIDs;
     if (datanodeID.length != storageIDs.length) {
       final String err = (storageIDs.length == 0?
           "Missing storageIDs: It is likely that the HDFS client,"
@@ -524,9 +526,11 @@ public class DatanodeManager {
         continue;
       }
       final DatanodeDescriptor dd = getDatanode(datanodeID[i]);
-      storages[i] = dd.getStorageInfo(storageIDs[i]);
+      if (dd != null) {
+        storages[i] = dd.getStorageInfo(storageIDs[i]);
+      }
     }
-    return storages; 
+    return storages;
   }
 
   /** Prints information about all datanodes. */
@@ -1366,6 +1370,10 @@ public class DatanodeManager {
       } else {
         rBlock = new RecoveringBlock(primaryBlock, recoveryInfos,
             uc.getBlockRecoveryId());
+        if (b.isStriped()) {
+          rBlock = new RecoveringStripedBlock(rBlock, uc.getBlockIndices(),
+              ((BlockInfoStriped) b).getErasureCodingPolicy());
+        }
       }
       brCommand.add(rBlock);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/61ab0440/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockRecoveryWorker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockRecoveryWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockRecoveryWorker.java
index 4ad1168..db0c6ce 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockRecoveryWorker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockRecoveryWorker.java
@@ -17,16 +17,21 @@
  */
 package org.apache.hadoop.hdfs.server.datanode;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
 import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringStripedBlock;
 import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
 import org.apache.hadoop.ipc.RemoteException;
@@ -37,7 +42,12 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+
+import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BLOCK_GROUP_INDEX_MASK;
+import static org.apache.hadoop.hdfs.util.StripedBlockUtil.getInternalBlockLength;
 
 /**
  * This class handles the block recovery work commands.
@@ -78,6 +88,10 @@ public class BlockRecoveryWorker {
           newLength);
     }
 
+    public ReplicaRecoveryInfo getReplicaRecoveryInfo(){
+      return rInfo;
+    }
+
     @Override
     public String toString() {
       return "block:" + rInfo + " node:" + id;
@@ -294,12 +308,8 @@ public class BlockRecoveryWorker {
       // we never know the actual state of the replica on failed data-nodes.
       // The recovery should be started over.
       if (!failedList.isEmpty()) {
-        StringBuilder b = new StringBuilder();
-        for(DatanodeID id : failedList) {
-          b.append("\n  " + id);
-        }
-        throw new IOException("Cannot recover " + block + ", the following "
-            + failedList.size() + " data-nodes failed {" + b + "\n}");
+        throw new IOException("Cannot recover " + block
+            + ", the following datanodes failed: " + failedList);
       }
 
       // Notify the name-node about successfully recovered replicas.
@@ -323,6 +333,215 @@ public class BlockRecoveryWorker {
     }
   }
 
+  /**
+   * blk_0  blk_1  blk_2  blk_3  blk_4  blk_5  blk_6  blk_7  blk_8
+   *  64k    64k    64k    64k    64k    64k    64k    64k    64k   <-- stripe_0
+   *  64k    64k    64k    64k    64k    64k    64k    64k    64k
+   *  64k    64k    64k    64k    64k    64k    64k    61k    <-- startStripeIdx
+   *  64k    64k    64k    64k    64k    64k    64k
+   *  64k    64k    64k    64k    64k    64k    59k
+   *  64k    64k    64k    64k    64k    64k
+   *  64k    64k    64k    64k    64k    64k                <-- last full stripe
+   *  64k    64k    13k    64k    55k     3k              <-- target last stripe
+   *  64k    64k           64k     1k
+   *  64k    64k           58k
+   *  64k    64k
+   *  64k    19k
+   *  64k                                               <-- total visible stripe
+   *
+   *  Due to different speed of streamers, the internal blocks in a block group
+   *  could have different lengths when the block group isn't ended normally.
+   *  The purpose of this class is to recover the UnderConstruction block group,
+   *  so all internal blocks end at the same stripe.
+   *
+   * The steps:
+   * 1. get all blocks lengths from DataNodes.
+   * 2. calculate safe length, which is at the target last stripe.
+   * 3. decode and feed blk_6~8, make them end at last full stripe. (the last
+   * full stripe means the last decodable stripe.)
+   * 4. encode the target last stripe, with the remaining sequential data. In
+   * this case, the sequential data is 64k+64k+13k. Feed blk_6~8 the parity cells.
+   * Overwrite the parity cell if have to.
+   * 5. truncate the stripes from visible stripe, to target last stripe.
+   * TODO: implement step 3,4
+   */
+  public class RecoveryTaskStriped {
+    private final RecoveringBlock rBlock;
+    private final ExtendedBlock block;
+    private final String bpid;
+    private final DatanodeInfo[] locs;
+    private final long recoveryId;
+
+    private final int[] blockIndices;
+    private final ErasureCodingPolicy ecPolicy;
+
+    RecoveryTaskStriped(RecoveringStripedBlock rBlock) {
+      this.rBlock = rBlock;
+      // TODO: support truncate
+      Preconditions.checkArgument(rBlock.getNewBlock() == null);
+
+      block = rBlock.getBlock();
+      bpid = block.getBlockPoolId();
+      locs = rBlock.getLocations();
+      recoveryId = rBlock.getNewGenerationStamp();
+      blockIndices = rBlock.getBlockIndices();
+      ecPolicy = rBlock.getErasureCodingPolicy();
+    }
+
+    protected void recover() throws IOException {
+      checkLocations(locs.length);
+
+      Map<Long, BlockRecord> syncBlocks = new HashMap<>(locs.length);
+      final int dataBlkNum = ecPolicy.getNumDataUnits();
+      final int totalBlkNum = dataBlkNum + ecPolicy.getNumParityUnits();
+      //check generation stamps
+      for (int i = 0; i < locs.length; i++) {
+        DatanodeID id = locs[i];
+        try {
+          DatanodeID bpReg = new DatanodeID(
+              datanode.getBPOfferService(bpid).bpRegistration);
+          InterDatanodeProtocol proxyDN = bpReg.equals(id) ?
+              datanode : DataNode.createInterDataNodeProtocolProxy(id, conf,
+              dnConf.socketTimeout, dnConf.connectToDnViaHostname);
+          ExtendedBlock internalBlk = new ExtendedBlock(block);
+          final long blockId = block.getBlockId() + blockIndices[i];
+          internalBlk.setBlockId(blockId);
+          ReplicaRecoveryInfo info = callInitReplicaRecovery(proxyDN,
+              new RecoveringBlock(internalBlk, null, recoveryId));
+
+          if (info != null &&
+              info.getGenerationStamp() >= block.getGenerationStamp() &&
+              info.getNumBytes() > 0) {
+            final BlockRecord existing = syncBlocks.get(blockId);
+            if (existing == null ||
+                info.getNumBytes() > existing.rInfo.getNumBytes()) {
+              // if we have >1 replicas for the same internal block, we
+              // simply choose the one with larger length.
+              // TODO: better usage of redundant replicas
+              syncBlocks.put(blockId, new BlockRecord(id, proxyDN, info));
+            }
+          }
+        } catch (RecoveryInProgressException ripE) {
+          InterDatanodeProtocol.LOG.warn(
+              "Recovery for replica " + block + " on data-node " + id
+                  + " is already in progress. Recovery id = "
+                  + rBlock.getNewGenerationStamp() + " is aborted.", ripE);
+          return;
+        } catch (IOException e) {
+          InterDatanodeProtocol.LOG.warn(
+              "Failed to obtain replica info for block (=" + block
+                  + ") from datanode (=" + id + ")", e);
+        }
+      }
+      checkLocations(syncBlocks.size());
+
+      final long safeLength = getSafeLength(syncBlocks);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Recovering block " + block
+            + ", length=" + block.getNumBytes() + ", safeLength=" + safeLength
+            + ", syncList=" + syncBlocks);
+      }
+
+      // If some internal blocks reach the safe length, convert them to RUR
+      List<BlockRecord> rurList = new ArrayList<>(locs.length);
+      for (BlockRecord r : syncBlocks.values()) {
+        int blockIndex = (int) (r.rInfo.getBlockId() & BLOCK_GROUP_INDEX_MASK);
+        long newSize = getInternalBlockLength(safeLength, ecPolicy.getCellSize(),
+            dataBlkNum, blockIndex);
+        if (r.rInfo.getNumBytes() >= newSize) {
+          rurList.add(r);
+        }
+      }
+      assert rurList.size() >= dataBlkNum : "incorrect safe length";
+
+      // Recovery the striped block by truncating internal blocks to the safe
+      // length. Abort if there is any failure in this step.
+      truncatePartialBlock(rurList, safeLength);
+
+      // notify Namenode the new size and locations
+      final DatanodeID[] newLocs = new DatanodeID[totalBlkNum];
+      final String[] newStorages = new String[totalBlkNum];
+      for (int i = 0; i < totalBlkNum; i++) {
+        newLocs[blockIndices[i]] = DatanodeID.EMPTY_DATANODE_ID;
+        newStorages[blockIndices[i]] = "";
+      }
+      for (BlockRecord r : rurList) {
+        int index = (int) (r.rInfo.getBlockId() &
+            HdfsServerConstants.BLOCK_GROUP_INDEX_MASK);
+        newLocs[index] = r.id;
+        newStorages[index] = r.storageID;
+      }
+      ExtendedBlock newBlock = new ExtendedBlock(bpid, block.getBlockId(),
+          safeLength, recoveryId);
+      DatanodeProtocolClientSideTranslatorPB nn = getActiveNamenodeForBP(bpid);
+      nn.commitBlockSynchronization(block, newBlock.getGenerationStamp(),
+          newBlock.getNumBytes(), true, false, newLocs, newStorages);
+    }
+
+    private void truncatePartialBlock(List<BlockRecord> rurList,
+        long safeLength) throws IOException {
+      int cellSize = ecPolicy.getCellSize();
+      int dataBlkNum = ecPolicy.getNumDataUnits();
+      List<DatanodeID> failedList = new ArrayList<>();
+      for (BlockRecord r : rurList) {
+        int blockIndex = (int) (r.rInfo.getBlockId() & BLOCK_GROUP_INDEX_MASK);
+        long newSize = getInternalBlockLength(safeLength, cellSize, dataBlkNum,
+            blockIndex);
+        try {
+          r.updateReplicaUnderRecovery(bpid, recoveryId, r.rInfo.getBlockId(),
+              newSize);
+        } catch (IOException e) {
+          InterDatanodeProtocol.LOG.warn("Failed to updateBlock (newblock="
+              + ", datanode=" + r.id + ")", e);
+          failedList.add(r.id);
+        }
+      }
+
+      // If any of the data-nodes failed, the recovery fails, because
+      // we never know the actual state of the replica on failed data-nodes.
+      // The recovery should be started over.
+      if (!failedList.isEmpty()) {
+        throw new IOException("Cannot recover " + block
+            + ", the following datanodes failed: " + failedList);
+      }
+    }
+
+    /**
+     * TODO: the current implementation depends on the assumption that the
+     * parity cells are only generated based on the full stripe. This is not
+     * true after we support hflush.
+     */
+    @VisibleForTesting
+    long getSafeLength(Map<Long, BlockRecord> syncBlocks) {
+      final int cellSize = ecPolicy.getCellSize();
+      final int dataBlkNum = ecPolicy.getNumDataUnits();
+      Preconditions.checkArgument(syncBlocks.size() >= dataBlkNum);
+      final int stripeSize = dataBlkNum * cellSize;
+      long[] blockLengths = new long[syncBlocks.size()];
+      int i = 0;
+      for (BlockRecord r : syncBlocks.values()) {
+        ReplicaRecoveryInfo rInfo = r.getReplicaRecoveryInfo();
+        blockLengths[i++] = rInfo.getNumBytes();
+      }
+      Arrays.sort(blockLengths);
+      // full stripe is a stripe has at least dataBlkNum full cells.
+      // lastFullStripeIdx is the index of the last full stripe.
+      int lastFullStripeIdx =
+          (int) (blockLengths[blockLengths.length - dataBlkNum] / cellSize);
+      return lastFullStripeIdx * stripeSize; // return the safeLength
+      // TODO: Include lastFullStripeIdx+1 stripe in safeLength, if there exists
+      // such a stripe (and it must be partial).
+    }
+
+    private void checkLocations(int locationCount)
+        throws IOException {
+      if (locationCount < ecPolicy.getNumDataUnits()) {
+        throw new IOException(block + " has no enough internal blocks" +
+            ", unable to start recovery. Locations=" + Arrays.asList(locs));
+      }
+    }
+  }
+
   private static void logRecoverBlock(String who, RecoveringBlock rb) {
     ExtendedBlock block = rb.getBlock();
     DatanodeInfo[] targets = rb.getLocations();
@@ -379,8 +598,11 @@ public class BlockRecoveryWorker {
         for(RecoveringBlock b : blocks) {
           try {
             logRecoverBlock(who, b);
-            RecoveryTaskContiguous task = new RecoveryTaskContiguous(b);
-            task.recover();
+            if (b.isStriped()) {
+              new RecoveryTaskStriped((RecoveringStripedBlock) b).recover();
+            } else {
+              new RecoveryTaskContiguous(b).recover();
+            }
           } catch (IOException e) {
             LOG.warn("recoverBlocks FAILED: " + b, e);
           }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/61ab0440/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index b25c5f7..97cb6fb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -197,7 +197,6 @@ import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretMan
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockIdManager;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockUnderConstructionFeature;
@@ -3285,57 +3284,42 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
           storedBlock.setNumBytes(newlength);
         }
 
-        // find the DatanodeDescriptor objects
-        ArrayList<DatanodeDescriptor> trimmedTargets =
-            new ArrayList<DatanodeDescriptor>(newtargets.length);
-        ArrayList<String> trimmedStorages =
-            new ArrayList<String>(newtargets.length);
-        if (newtargets.length > 0) {
-          for (int i = 0; i < newtargets.length; ++i) {
-            // try to get targetNode
-            DatanodeDescriptor targetNode =
-                blockManager.getDatanodeManager().getDatanode(newtargets[i]);
-            if (targetNode != null) {
-              trimmedTargets.add(targetNode);
-              trimmedStorages.add(newtargetstorages[i]);
-            } else if (LOG.isDebugEnabled()) {
-              LOG.debug("DatanodeDescriptor (=" + newtargets[i] + ") not found");
-            }
-          }
-        }
-        if ((closeFile) && !trimmedTargets.isEmpty()) {
+        // Find the target DatanodeStorageInfos. If not found because of invalid
+        // or empty DatanodeID/StorageID, the slot of same offset in dsInfos is
+        // null
+        final DatanodeStorageInfo[] dsInfos = blockManager.getDatanodeManager().
+            getDatanodeStorageInfos(newtargets, newtargetstorages,
+                "src=%s, oldBlock=%s, newgenerationstamp=%d, newlength=%d",
+                src, oldBlock, newgenerationstamp, newlength);
+
+        if (closeFile && dsInfos != null) {
           // the file is getting closed. Insert block locations into blockManager.
           // Otherwise fsck will report these blocks as MISSING, especially if the
           // blocksReceived from Datanodes take a long time to arrive.
-          for (int i = 0; i < trimmedTargets.size(); i++) {
-            DatanodeStorageInfo storageInfo =
-                trimmedTargets.get(i).getStorageInfo(trimmedStorages.get(i));
-            if (storageInfo != null) {
+          for (int i = 0; i < dsInfos.length; i++) {
+            if (dsInfos[i] != null) {
               if(copyTruncate) {
-                storageInfo.addBlock(truncatedBlock, truncatedBlock);
+                dsInfos[i].addBlock(truncatedBlock, truncatedBlock);
               } else {
-                storageInfo.addBlock(storedBlock, storedBlock);
+                Block bi = new Block(storedBlock);
+                if (storedBlock.isStriped()) {
+                  bi.setBlockId(bi.getBlockId() + i);
+                }
+                dsInfos[i].addBlock(storedBlock, bi);
               }
             }
           }
         }
 
         // add pipeline locations into the INodeUnderConstruction
-        DatanodeStorageInfo[] trimmedStorageInfos =
-            blockManager.getDatanodeManager().getDatanodeStorageInfos(
-                trimmedTargets.toArray(new DatanodeID[trimmedTargets.size()]),
-                trimmedStorages.toArray(new String[trimmedStorages.size()]),
-                "src=%s, oldBlock=%s, newgenerationstamp=%d, newlength=%d",
-                src, oldBlock, newgenerationstamp, newlength);
-
         if(copyTruncate) {
-          iFile.convertLastBlockToUC(truncatedBlock, trimmedStorageInfos);
+          iFile.convertLastBlockToUC(truncatedBlock, dsInfos);
         } else {
-          iFile.convertLastBlockToUC(storedBlock, trimmedStorageInfos);
+          iFile.convertLastBlockToUC(storedBlock, dsInfos);
           if (closeFile) {
             blockManager.markBlockReplicasAsCorrupt(oldBlock.getLocalBlock(),
                 storedBlock, oldGenerationStamp, oldNumBytes,
-                trimmedStorageInfos);
+                dsInfos);
           }
         }
       }
@@ -3343,7 +3327,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       if (closeFile) {
         if(copyTruncate) {
           closeFileCommitBlocks(src, iFile, truncatedBlock);
-          if(!iFile.isBlockInLatestSnapshot((BlockInfoContiguous) storedBlock)) {
+          if(!iFile.isBlockInLatestSnapshot(storedBlock)) {
             blockManager.removeBlock(storedBlock);
           }
         } else {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/61ab0440/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockRecoveryCommand.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockRecoveryCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockRecoveryCommand.java
index 3adc85c..8dc9d39 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockRecoveryCommand.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockRecoveryCommand.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 
@@ -76,6 +77,13 @@ public class BlockRecoveryCommand extends DatanodeCommand {
       this.recoveryBlock = recoveryBlock;
     }
 
+    public RecoveringBlock(RecoveringBlock rBlock) {
+      super(rBlock.getBlock(), rBlock.getLocations(), rBlock.getStorageIDs(),
+          rBlock.getStorageTypes());
+      this.newGenerationStamp = rBlock.newGenerationStamp;
+      this.recoveryBlock = rBlock.recoveryBlock;
+    }
+
     /**
      * Return the new generation stamp of the block,
      * which also plays role of the recovery id.
@@ -92,6 +100,31 @@ public class BlockRecoveryCommand extends DatanodeCommand {
     }
   }
 
+  public static class RecoveringStripedBlock extends RecoveringBlock {
+    private final int[] blockIndices;
+    private final ErasureCodingPolicy ecPolicy;
+
+    public RecoveringStripedBlock(RecoveringBlock rBlock, int[] blockIndices,
+        ErasureCodingPolicy ecPolicy) {
+      super(rBlock);
+      this.blockIndices = blockIndices;
+      this.ecPolicy = ecPolicy;
+    }
+
+    public int[] getBlockIndices() {
+      return blockIndices;
+    }
+
+    public ErasureCodingPolicy getErasureCodingPolicy() {
+      return ecPolicy;
+    }
+
+    @Override
+    public boolean isStriped() {
+      return true;
+    }
+  }
+
   /**
    * Create empty BlockRecoveryCommand.
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/61ab0440/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/HdfsServer.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/HdfsServer.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/HdfsServer.proto
index 66b2a33..453ba29 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/HdfsServer.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/HdfsServer.proto
@@ -120,6 +120,10 @@ message RecoveringBlockProto {
   required uint64 newGenStamp = 1;        // New genstamp post recovery
   required LocatedBlockProto block = 2;   // Block to be recovered
   optional BlockProto truncateBlock = 3;  // New block for recovery (truncate)
+
+  optional ErasureCodingPolicyProto ecPolicy = 4;
+  // block indices of striped internal blocks for each storage in LocatedBlock
+  repeated uint32 blockIndices = 5;
 }
 
 /**
@@ -195,4 +199,4 @@ message NamenodeRegistrationProto {
   }
   required StorageInfoProto storageInfo = 3;  // Node information
   optional NamenodeRoleProto role = 4 [default = NAMENODE];        // Namenode role
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/61ab0440/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java
index 9942a2d..2f54078 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java
@@ -59,13 +59,11 @@ public class StripedFileTestUtil {
   public static final short NUM_DATA_BLOCKS = (short) 6;
   public static final short NUM_PARITY_BLOCKS = (short) 3;
   public static final int BLOCK_STRIPED_CELL_SIZE = 64 * 1024;
-  public static final int BLOCK_STRIPE_SIZE = BLOCK_STRIPED_CELL_SIZE * NUM_DATA_BLOCKS;
-
-  public static final int stripesPerBlock = 4;
-  public static final int blockSize = BLOCK_STRIPED_CELL_SIZE * stripesPerBlock;
-  public static final int numDNs = NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS + 2;
-  public static final int BLOCK_GROUP_SIZE = blockSize * NUM_DATA_BLOCKS;
 
+  static int stripesPerBlock = 4;
+  public static int blockSize = BLOCK_STRIPED_CELL_SIZE * stripesPerBlock;
+  static int numDNs = NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS + 2;
+  static int BLOCK_GROUP_SIZE = blockSize * NUM_DATA_BLOCKS;
 
   static byte[] generateBytes(int cnt) {
     byte[] bytes = new byte[cnt];

http://git-wip-us.apache.org/repos/asf/hadoop/blob/61ab0440/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecoveryStriped.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecoveryStriped.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecoveryStriped.java
new file mode 100644
index 0000000..38ee67a
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecoveryStriped.java
@@ -0,0 +1,257 @@
+package org.apache.hadoop.hdfs;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.log4j.Level;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.internal.util.reflection.Whitebox;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+
+public class TestLeaseRecoveryStriped {
+  public static final Log LOG = LogFactory.getLog(TestLeaseRecoveryStriped.class);
+
+  private static final int NUM_DATA_BLOCKS = StripedFileTestUtil.NUM_DATA_BLOCKS;
+  private static final int NUM_PARITY_BLOCKS = StripedFileTestUtil.NUM_PARITY_BLOCKS;
+  private static final int CELL_SIZE = StripedFileTestUtil.BLOCK_STRIPED_CELL_SIZE;
+  private static final int STRIPE_SIZE = NUM_DATA_BLOCKS * CELL_SIZE;
+  private static final int STRIPES_PER_BLOCK = 15;
+  private static final int BLOCK_SIZE = CELL_SIZE * STRIPES_PER_BLOCK;
+  private static final int BLOCK_GROUP_SIZE = BLOCK_SIZE * NUM_DATA_BLOCKS;
+  private static final int bytesPerChecksum = 512;
+
+  static {
+    GenericTestUtils.setLogLevel(DataNode.LOG, Level.ALL);
+    StripedFileTestUtil.stripesPerBlock = STRIPES_PER_BLOCK;
+    StripedFileTestUtil.blockSize = BLOCK_SIZE;
+    StripedFileTestUtil.BLOCK_GROUP_SIZE = BLOCK_GROUP_SIZE;
+  }
+
+  static private final String fakeUsername = "fakeUser1";
+  static private final String fakeGroup = "supergroup";
+
+  private MiniDFSCluster cluster;
+  private DistributedFileSystem dfs;
+  private Configuration conf;
+  private final Path dir = new Path("/" + this.getClass().getSimpleName());
+  final Path p = new Path(dir, "testfile");
+
+  @Before
+  public void setup() throws IOException {
+    conf = new HdfsConfiguration();
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+    conf.setLong(HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 6000L);
+    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY,
+        false);
+    conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 0);
+    final int numDNs = NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS;
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
+    cluster.waitActive();
+    dfs = cluster.getFileSystem();
+    dfs.mkdirs(dir);
+    dfs.setErasureCodingPolicy(dir, null);
+  }
+
+  @After
+  public void tearDown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  public static final int[][][] BLOCK_LENGTHS_SUITE = {
+      {{ 11 * CELL_SIZE,10 * CELL_SIZE, 9 * CELL_SIZE,
+          8 * CELL_SIZE, 7 * CELL_SIZE, 6 * CELL_SIZE,
+          5 * CELL_SIZE, 4 * CELL_SIZE, 3 * CELL_SIZE},
+          {36 * CELL_SIZE}},
+
+      {{  3 * CELL_SIZE, 4 * CELL_SIZE, 5 * CELL_SIZE,
+          6 * CELL_SIZE, 7 * CELL_SIZE, 8 * CELL_SIZE,
+          9 * CELL_SIZE,10 * CELL_SIZE,11 * CELL_SIZE},
+          {36 * CELL_SIZE}},
+
+      {{ 11 * CELL_SIZE, 7 * CELL_SIZE, 6 * CELL_SIZE,
+          5 * CELL_SIZE, 4 * CELL_SIZE, 2 * CELL_SIZE,
+          9 * CELL_SIZE,10 * CELL_SIZE,11 * CELL_SIZE},
+          {36 * CELL_SIZE}},
+
+      {{  8 * CELL_SIZE + bytesPerChecksum,
+          7 * CELL_SIZE + bytesPerChecksum * 2,
+          6 * CELL_SIZE + bytesPerChecksum * 2,
+          5 * CELL_SIZE - bytesPerChecksum * 3,
+          4 * CELL_SIZE - bytesPerChecksum * 4,
+          3 * CELL_SIZE - bytesPerChecksum * 4,
+          9 * CELL_SIZE, 10 * CELL_SIZE, 11 * CELL_SIZE},
+          {36 * CELL_SIZE}},
+  };
+
+  @Test
+  public void testLeaseRecovery() throws Exception {
+    for(int i=0; i < BLOCK_LENGTHS_SUITE.length; i++){
+      int[] blockLengths = BLOCK_LENGTHS_SUITE[i][0];
+      int safeLength = BLOCK_LENGTHS_SUITE[i][1][0];
+      try {
+        runTest(blockLengths, safeLength);
+      } catch (Throwable e){
+        String msg = "failed testCase at i=" + i + ", blockLengths="
+            + Arrays.toString(blockLengths) + "\n"
+            + StringUtils.stringifyException(e);
+        Assert.fail(msg);
+      }
+    }
+  }
+
+  private void runTest(int[] blockLengths, int safeLength) throws Exception {
+    writePartialBlocks(blockLengths);
+    recoverLease();
+
+    List<Long> oldGS = new ArrayList<>();
+    oldGS.add(1001L);
+    StripedFileTestUtil.checkData(dfs, p, safeLength,
+        new ArrayList<DatanodeInfo>(), oldGS);
+    // After recovery, storages are reported by primary DN. we should verify
+    // storages reported by blockReport.
+    cluster.restartNameNode(true);
+    StripedFileTestUtil.checkData(dfs, p, safeLength,
+        new ArrayList<DatanodeInfo>(), oldGS);
+  }
+
+  private void writePartialBlocks(int[] blockLengths) throws Exception {
+    final FSDataOutputStream out = dfs.create(p);
+    final DFSStripedOutputStream stripedOut
+        = (DFSStripedOutputStream) out.getWrappedStream();
+    int length = (STRIPES_PER_BLOCK - 1) * STRIPE_SIZE;
+    int[] posToKill = getPosToKill(blockLengths);
+    int checkingPos = nextCheckingPos(posToKill, 0);
+    try {
+      for (int pos = 0; pos < length; pos++) {
+        out.write(StripedFileTestUtil.getByte(pos));
+        if (pos == checkingPos) {
+          for (int index : getIndexToStop(posToKill, pos)) {
+            out.flush();
+            stripedOut.enqueueAllCurrentPackets();
+            StripedDataStreamer s = stripedOut.getStripedDataStreamer(index);
+            waitStreamerAllAcked(s);
+            waitByteSent(s, blockLengths[index]);
+            stopBlockStream(s);
+          }
+          checkingPos = nextCheckingPos(posToKill, pos);
+        }
+      }
+    } finally {
+      DFSTestUtil.abortStream(stripedOut);
+    }
+  }
+
+  private int nextCheckingPos(int[] posToKill, int curPos) {
+    int checkingPos = Integer.MAX_VALUE;
+    for (int i = 0; i < posToKill.length; i++) {
+      if (posToKill[i] > curPos) {
+        checkingPos = Math.min(checkingPos, posToKill[i]);
+      }
+    }
+    return checkingPos;
+  }
+
+  private int[] getPosToKill(int[] blockLengths) {
+    int[] posToKill = new int[NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS];
+    for (int i = 0; i < NUM_DATA_BLOCKS; i++) {
+      int numStripe = (blockLengths[i] - 1) / CELL_SIZE;
+      posToKill[i] = numStripe * STRIPE_SIZE
+          + i * CELL_SIZE + blockLengths[i] % CELL_SIZE;
+      if (blockLengths[i] % CELL_SIZE == 0) {
+        posToKill[i] += CELL_SIZE;
+      }
+    }
+    for (int i = NUM_DATA_BLOCKS; i < NUM_DATA_BLOCKS+NUM_PARITY_BLOCKS; i++) {
+      Preconditions.checkArgument(blockLengths[i] % CELL_SIZE == 0);
+      int numStripe = (blockLengths[i]) / CELL_SIZE;
+      posToKill[i] = numStripe * STRIPE_SIZE;
+    }
+    return posToKill;
+  }
+
+  private List<Integer> getIndexToStop(int[] posToKill, int pos){
+    List<Integer> indices=new LinkedList<>();
+    for(int i=0;i<posToKill.length;i++){
+      if(pos==posToKill[i]){
+        indices.add(i);
+      }
+    }
+    return indices;
+  }
+
+  private void waitByteSent(final StripedDataStreamer s, final long byteSent)
+      throws Exception {
+    try {
+      GenericTestUtils.waitFor(new Supplier<Boolean>() {
+        @Override
+        public Boolean get() {
+          return s.bytesSent >= byteSent;
+        }
+      }, 100, 3000);
+    } catch (TimeoutException e) {
+      throw new IOException("Timeout waiting for streamer " + s +". Sent="
+          + s.bytesSent + ", expected="+byteSent);
+    }
+  }
+
+  private void stopBlockStream(StripedDataStreamer s) throws Exception {
+    IOUtils.NullOutputStream nullOutputStream = new IOUtils.NullOutputStream();
+    Whitebox.setInternalState(s, "blockStream",
+        new DataOutputStream(nullOutputStream));
+  }
+
+  private void recoverLease() throws Exception {
+    final DistributedFileSystem dfs2 = (DistributedFileSystem) getFSAsAnotherUser(conf);
+    try {
+      GenericTestUtils.waitFor(new Supplier<Boolean>() {
+        @Override
+        public Boolean get() {
+          try {
+            return dfs2.recoverLease(p);
+          } catch (IOException e) {
+            return false;
+          }
+        }
+      }, 5000, 24000);
+    } catch (TimeoutException e) {
+      throw new IOException("Timeout waiting for recoverLease()");
+    }
+  }
+
+  private FileSystem getFSAsAnotherUser(final Configuration c)
+      throws IOException, InterruptedException {
+    return FileSystem.get(FileSystem.getDefaultUri(c), c,
+        UserGroupInformation.createUserForTesting(fakeUsername,
+            new String[]{fakeGroup}).getUserName());
+  }
+
+  public static void waitStreamerAllAcked(DataStreamer s) throws IOException {
+    long toWaitFor = s.getLastQueuedSeqno();
+    s.waitForAckedSeqno(toWaitFor);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/61ab0440/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
index 4e9b4f4..82f5423 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
@@ -39,7 +39,9 @@ import java.net.InetSocketAddress;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -61,6 +63,7 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
@@ -68,8 +71,10 @@ import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.datanode.BlockRecoveryWorker.BlockRecord;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
+import org.apache.hadoop.hdfs.server.namenode.ErasureCodingPolicyManager;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringStripedBlock;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -93,6 +98,8 @@ import org.mockito.stubbing.Answer;
 
 import com.google.common.base.Supplier;
 
+import static org.apache.hadoop.hdfs.TestLeaseRecoveryStriped.BLOCK_LENGTHS_SUITE;
+
 /**
  * This tests if sync all replicas in block recovery works correctly
  */
@@ -243,8 +250,7 @@ public class TestBlockRecovery {
     
     DatanodeInfo[] locs = new DatanodeInfo[]{
         mock(DatanodeInfo.class), mock(DatanodeInfo.class)};
-    RecoveringBlock rBlock = new RecoveringBlock(block, 
-        locs, RECOVERY_ID);
+    RecoveringBlock rBlock = new RecoveringBlock(block, locs, RECOVERY_ID);
     ArrayList<BlockRecord> syncList = new ArrayList<BlockRecord>(2);
     BlockRecord record1 = new BlockRecord(
         DFSTestUtil.getDatanodeInfo("1.2.3.4", "bogus", 1234), dn1, replica1);
@@ -745,4 +751,28 @@ public class TestBlockRecovery {
       assertTrue(exceptionThrown);
     }
   }
+
+  @Test
+  public void testSafeLength() throws Exception {
+    ErasureCodingPolicy ecPolicy = ErasureCodingPolicyManager
+        .getSystemDefaultPolicy();
+    RecoveringStripedBlock rBlockStriped = new RecoveringStripedBlock(rBlock,
+        new int[9], ecPolicy);
+    BlockRecoveryWorker recoveryWorker = new BlockRecoveryWorker(dn);
+    BlockRecoveryWorker.RecoveryTaskStriped recoveryTask =
+        recoveryWorker.new RecoveryTaskStriped(rBlockStriped);
+
+    for (int i = 0; i < BLOCK_LENGTHS_SUITE.length; i++) {
+      int[] blockLengths = BLOCK_LENGTHS_SUITE[i][0];
+      int safeLength = BLOCK_LENGTHS_SUITE[i][1][0];
+      Map<Long, BlockRecord> syncList = new HashMap<>();
+      for (int id = 0; id < blockLengths.length; id++) {
+        ReplicaRecoveryInfo rInfo = new ReplicaRecoveryInfo(id,
+            blockLengths[id], 0, null);
+        syncList.put((long) id, new BlockRecord(null, null, rInfo));
+      }
+      Assert.assertEquals("BLOCK_LENGTHS_SUITE[" + i + "]", safeLength,
+          recoveryTask.getSafeLength(syncList));
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/61ab0440/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCommitBlockSynchronization.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCommitBlockSynchronization.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCommitBlockSynchronization.java
index d4c5924..0460ad1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCommitBlockSynchronization.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCommitBlockSynchronization.java
@@ -199,14 +199,15 @@ public class TestCommitBlockSynchronization {
     FSNamesystem namesystemSpy = makeNameSystemSpy(block, file);
     DatanodeID[] newTargets = new DatanodeID[]{
         new DatanodeID("0.0.0.0", "nonexistantHost", "1", 0, 0, 0, 0)};
+    String[] storageIDs = new String[]{"fake-storage-ID"};
 
     ExtendedBlock lastBlock = new ExtendedBlock();
     namesystemSpy.commitBlockSynchronization(
         lastBlock, genStamp, length, true,
-        false, newTargets, null);
+        false, newTargets, storageIDs);
 
     // Repeat the call to make sure it returns true
     namesystemSpy.commitBlockSynchronization(
-        lastBlock, genStamp, length, true, false, newTargets, null);
+        lastBlock, genStamp, length, true, false, newTargets, storageIDs);
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/61ab0440/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java
index 101601e..3a5c135 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java
@@ -46,7 +46,6 @@ import java.util.List;
 import static org.apache.hadoop.hdfs.StripedFileTestUtil.BLOCK_STRIPED_CELL_SIZE;
 import static org.apache.hadoop.hdfs.StripedFileTestUtil.NUM_DATA_BLOCKS;
 import static org.apache.hadoop.hdfs.StripedFileTestUtil.NUM_PARITY_BLOCKS;
-import static org.apache.hadoop.hdfs.StripedFileTestUtil.blockSize;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -183,7 +182,7 @@ public class TestRecoverStripedBlocks {
     Configuration conf = new HdfsConfiguration();
     conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1000);
     conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1000);
-    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, StripedFileTestUtil.blockSize);
     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(GROUP_SIZE + 2)
         .build();
     try {
@@ -191,7 +190,7 @@ public class TestRecoverStripedBlocks {
       DistributedFileSystem fs = cluster.getFileSystem();
       BlockManager bm = cluster.getNamesystem().getBlockManager();
       fs.getClient().setErasureCodingPolicy("/", null);
-      int fileLen = NUM_DATA_BLOCKS * blockSize;
+      int fileLen = NUM_DATA_BLOCKS * StripedFileTestUtil.blockSize;
       Path p = new Path("/test2RecoveryTasksForSameBlockGroup");
       final byte[] data = new byte[fileLen];
       DFSTestUtil.writeFile(fs, p, data);