You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wa...@apache.org on 2015/09/30 17:42:14 UTC

[14/50] [abbrv] hadoop git commit: Merge remote-tracking branch 'apache/trunk' into HDFS-7285

http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 6c6d758,1346ab3..8232ab9
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@@ -674,8 -648,8 +674,8 @@@ public class BlockManager implements Bl
        return false; // already completed (e.g. by syncBlock)
      
      final boolean b = commitBlock(lastBlock, commitBlock);
-     if (hasMinStorage(lastBlock)) {
-       completeBlock(bc, bc.numBlocks() - 1, false);
 -    if (countNodes(lastBlock).liveReplicas() >= minReplication) {
++      if (hasMinStorage(lastBlock)) {
+       completeBlock(lastBlock, false);
      }
      return b;
    }
@@@ -698,9 -666,9 +692,9 @@@
      }
  
      int numNodes = curBlock.numNodes();
 -    if (!force && numNodes < minReplication) {
 +    if (!force && !hasMinStorage(curBlock, numNodes)) {
-       throw new IOException("Cannot complete block: " +
-           "block does not satisfy minimal replication requirement.");
+       throw new IOException("Cannot complete block: "
+           + "block does not satisfy minimal replication requirement.");
      }
      if (!force && curBlock.getBlockUCState() != BlockUCState.COMMITTED) {
        throw new IOException(
@@@ -718,26 -683,10 +709,12 @@@
      // a "forced" completion when a file is getting closed by an
      // OP_CLOSE edit on the standby).
      namesystem.adjustSafeModeBlockTotals(0, 1);
 +    final int minStorage = curBlock.isStriped() ?
 +        ((BlockInfoStriped) curBlock).getRealDataBlockNum() : minReplication;
      namesystem.incrementSafeBlockCount(
 -        Math.min(numNodes, minReplication));
 +        Math.min(numNodes, minStorage), curBlock);
-     
-     // replace block in the blocksMap
-     return blocksMap.replaceBlock(completeBlock);
    }
  
-   private BlockInfo completeBlock(final BlockCollection bc,
-       final BlockInfo block, boolean force) throws IOException {
-     BlockInfo[] fileBlocks = bc.getBlocks();
-     for (int idx = 0; idx < fileBlocks.length; idx++) {
-       if (fileBlocks[idx] == block) {
-         return completeBlock(bc, idx, force);
-       }
-     }
-     return block;
-   }
-   
    /**
     * Force the given block in the given file to be marked as complete,
     * regardless of whether enough replicas are present. This is necessary
@@@ -1270,37 -1162,29 +1245,36 @@@
    private void markBlockAsCorrupt(BlockToMarkCorrupt b,
        DatanodeStorageInfo storageInfo,
        DatanodeDescriptor node) throws IOException {
--
-     if (b.stored.isDeleted()) {
 -    if (b.getCorrupted().isDeleted()) {
++    if (b.getStored().isDeleted()) {
        blockLog.debug("BLOCK markBlockAsCorrupt: {} cannot be marked as" +
            " corrupt as it does not belong to any file", b);
-       addToInvalidates(b.corrupted, node);
+       addToInvalidates(b.getCorrupted(), node);
        return;
 -    } 
 -    short expectedReplicas = b.getCorrupted().getReplication();
 +    }
 +    short expectedReplicas =
-         getExpectedReplicaNum(b.stored);
++        getExpectedReplicaNum(b.getStored());
  
      // Add replica to the data-node if it is not already there
      if (storageInfo != null) {
-       storageInfo.addBlock(b.stored, b.corrupted);
 -      storageInfo.addBlock(b.getStored());
++      storageInfo.addBlock(b.getStored(), b.getCorrupted());
      }
  
 -    // Add this replica to corruptReplicas Map
 -    corruptReplicas.addToCorruptReplicasMap(b.getCorrupted(), node,
 -        b.getReason(), b.getReasonCode());
 +    // Add this replica to corruptReplicas Map. For striped blocks, we always
 +    // use the id of whole striped block group when adding to corruptReplicas
-     Block corrupted = new Block(b.corrupted);
-     if (b.stored.isStriped()) {
-       corrupted.setBlockId(b.stored.getBlockId());
++    Block corrupted = new Block(b.getCorrupted());
++    if (b.getStored().isStriped()) {
++      corrupted.setBlockId(b.getStored().getBlockId());
 +    }
-     corruptReplicas.addToCorruptReplicasMap(corrupted, node, b.reason,
-         b.reasonCode);
++    corruptReplicas.addToCorruptReplicasMap(corrupted, node, b.getReason(),
++        b.getReasonCode());
  
-     NumberReplicas numberOfReplicas = countNodes(b.stored);
+     NumberReplicas numberOfReplicas = countNodes(b.getStored());
      boolean hasEnoughLiveReplicas = numberOfReplicas.liveReplicas() >=
          expectedReplicas;
 -    boolean minReplicationSatisfied =
 -        numberOfReplicas.liveReplicas() >= minReplication;
 +
-     boolean minReplicationSatisfied = hasMinStorage(b.stored,
++    boolean minReplicationSatisfied = hasMinStorage(b.getStored(),
 +        numberOfReplicas.liveReplicas());
 +
      boolean hasMoreCorruptReplicas = minReplicationSatisfied &&
          (numberOfReplicas.liveReplicas() + numberOfReplicas.corruptReplicas()) >
          expectedReplicas;
@@@ -1315,10 -1199,10 +1289,10 @@@
      if (hasEnoughLiveReplicas || hasMoreCorruptReplicas
          || corruptedDuringWrite) {
        // the block is over-replicated so invalidate the replicas immediately
 -      invalidateBlock(b, node);
 +      invalidateBlock(b, node, numberOfReplicas);
      } else if (namesystem.isPopulatingReplQueues()) {
        // add the block to neededReplication
-       updateNeededReplications(b.stored, -1, 0);
+       updateNeededReplications(b.getStored(), -1, 0);
      }
    }
  
@@@ -1342,13 -1227,12 +1316,13 @@@
            "invalidation of {} on {} because {} replica(s) are located on " +
            "nodes with potentially out-of-date block reports", b, dn,
            nr.replicasOnStaleNodes());
-       postponeBlock(b.corrupted);
+       postponeBlock(b.getCorrupted());
        return false;
 -    } else if (nr.liveReplicas() >= 1) {
 -      // If we have at least one copy on a live node, then we can delete it.
 +    } else {
 +      // we already checked the number of replicas in the caller of this
 +      // function and know there are enough live replicas, so we can delete it.
-       addToInvalidates(b.corrupted, dn);
-       removeStoredBlock(b.stored, node);
+       addToInvalidates(b.getCorrupted(), dn);
+       removeStoredBlock(b.getStored(), node);
        blockLog.debug("BLOCK* invalidateBlocks: {} on {} listed for deletion.",
            b, dn);
        return true;
@@@ -1446,72 -1326,11 +1420,11 @@@
      namesystem.writeLock();
      try {
        synchronized (neededReplications) {
 -        for (int priority = 0; priority < blocksToReplicate.size(); priority++) {
 -          for (BlockInfo block : blocksToReplicate.get(priority)) {
 -            ReplicationWork rw = scheduleReplication(block, priority);
 +        for (int priority = 0; priority < blocksToRecover.size(); priority++) {
 +          for (BlockInfo block : blocksToRecover.get(priority)) {
-             // block should belong to a file
-             bc = getBlockCollection(block);
-             // abandoned block or block reopened for append
-             if (bc == null
-                 || (bc.isUnderConstruction() && block.equals(bc.getLastBlock()))) {
-               // remove from neededReplications
-               neededReplications.remove(block, priority);
-               continue;
-             }
- 
-             requiredReplication = getExpectedReplicaNum(block);
- 
-             // get a source data-node
-             containingNodes = new ArrayList<>();
-             List<DatanodeStorageInfo> liveReplicaNodes = new ArrayList<>();
-             NumberReplicas numReplicas = new NumberReplicas();
-             List<Short> liveBlockIndices = new ArrayList<>();
-             final DatanodeDescriptor[] srcNodes = chooseSourceDatanodes(block,
-                 containingNodes, liveReplicaNodes, numReplicas,
-                 liveBlockIndices, priority);
-             if(srcNodes == null || srcNodes.length == 0) {
-               // block can not be replicated from any node
-               LOG.debug("Block " + block + " cannot be recovered " +
-                   "from any node");
-               continue;
-             }
- 
-             // liveReplicaNodes can include READ_ONLY_SHARED replicas which are
-             // not included in the numReplicas.liveReplicas() count
-             assert liveReplicaNodes.size() >= numReplicas.liveReplicas();
- 
-             // do not schedule more if enough replicas is already pending
-             numEffectiveReplicas = numReplicas.liveReplicas() +
-                                     pendingReplications.getNumReplicas(block);
- 
-             if (numEffectiveReplicas >= requiredReplication) {
-               if ( (pendingReplications.getNumReplicas(block) > 0) ||
-                    (blockHasEnoughRacks(block, requiredReplication)) ) {
-                 neededReplications.remove(block, priority); // remove from neededReplications
-                 blockLog.debug("BLOCK* Removing {} from neededReplications as" +
-                         " it has enough replicas", block);
-                 continue;
-               }
-             }
- 
-             if (numReplicas.liveReplicas() < requiredReplication) {
-               additionalReplRequired = requiredReplication
-                   - numEffectiveReplicas;
-             } else {
-               additionalReplRequired = 1; // Needed on a new rack
-             }
-             if (block.isStriped()) {
-               short[] indices = new short[liveBlockIndices.size()];
-               for (int i = 0 ; i < liveBlockIndices.size(); i++) {
-                 indices[i] = liveBlockIndices.get(i);
-               }
-               ErasureCodingWork ecw = new ErasureCodingWork(block, bc, srcNodes,
-                   containingNodes, liveReplicaNodes, additionalReplRequired,
-                   priority, indices);
-               recovWork.add(ecw);
-             } else {
-               recovWork.add(new ReplicationWork(block, bc, srcNodes,
-                   containingNodes, liveReplicaNodes, additionalReplRequired,
-                   priority));
++            BlockRecoveryWork rw = scheduleRecovery(block, priority);
+             if (rw != null) {
 -              work.add(rw);
++              recovWork.add(rw);
              }
            }
          }
@@@ -1520,9 -1339,8 +1433,9 @@@
        namesystem.writeUnlock();
      }
  
 +    // Step 2: choose target nodes for each recovery task
-     final Set<Node> excludedNodes = new HashSet<Node>();
+     final Set<Node> excludedNodes = new HashSet<>();
 -    for(ReplicationWork rw : work){
 +    for(BlockRecoveryWork rw : recovWork){
        // Exclude all of the containing nodes from being targets.
        // This list includes decommissioning or corrupt nodes.
        excludedNodes.clear();
@@@ -1533,101 -1351,21 +1446,24 @@@
        // choose replication targets: NOT HOLDING THE GLOBAL LOCK
        // It is costly to extract the filename for which chooseTargets is called,
        // so for now we pass in the block collection itself.
 -      rw.chooseTargets(blockplacement, storagePolicySuite, excludedNodes);
 +      final BlockPlacementPolicy placementPolicy =
-           placementPolicies.getPolicy(rw.block.isStriped());
++          placementPolicies.getPolicy(rw.getBlock().isStriped());
 +      rw.chooseTargets(placementPolicy, storagePolicySuite, excludedNodes);
      }
  
 +    // Step 3: add tasks to the DN
      namesystem.writeLock();
      try {
 -      for(ReplicationWork rw : work){
 +      for(BlockRecoveryWork rw : recovWork){
-         final DatanodeStorageInfo[] targets = rw.targets;
+         final DatanodeStorageInfo[] targets = rw.getTargets();
          if(targets == null || targets.length == 0){
-           rw.targets = null;
+           rw.resetTargets();
            continue;
          }
  
          synchronized (neededReplications) {
-           BlockInfo block = rw.block;
-           int priority = rw.priority;
-           // Recheck since global lock was released
-           // block should belong to a file
-           bc = getBlockCollection(block);
-           // abandoned block or block reopened for append
-           if(bc == null || (bc.isUnderConstruction() && block.equals(bc.getLastBlock()))) {
-             neededReplications.remove(block, priority); // remove from neededReplications
-             rw.targets = null;
-             continue;
-           }
-           requiredReplication = getExpectedReplicaNum(block);
- 
-           // do not schedule more if enough replicas is already pending
-           NumberReplicas numReplicas = countNodes(block);
-           numEffectiveReplicas = numReplicas.liveReplicas() +
-             pendingReplications.getNumReplicas(block);
- 
-           if (numEffectiveReplicas >= requiredReplication) {
-             if ( (pendingReplications.getNumReplicas(block) > 0) ||
-                  (blockHasEnoughRacks(block, requiredReplication)) ) {
-               neededReplications.remove(block, priority); // remove from neededReplications
-               rw.targets = null;
-               blockLog.debug("BLOCK* Removing {} from neededReplications as" +
-                       " it has enough replicas", block);
-               continue;
-             }
-           }
- 
-           if ( (numReplicas.liveReplicas() >= requiredReplication) &&
-                (!blockHasEnoughRacks(block, requiredReplication)) ) {
-             if (rw.srcNodes[0].getNetworkLocation().equals(
-                 targets[0].getDatanodeDescriptor().getNetworkLocation())) {
-               //No use continuing, unless a new rack in this case
-               continue;
-             }
-           }
- 
-           // Add block to the to be replicated list
-           if (block.isStriped()) {
-             assert rw instanceof ErasureCodingWork;
-             assert rw.targets.length > 0;
-             String src = getBlockCollection(block).getName();
-             ErasureCodingZone ecZone = null;
-             try {
-               ecZone = namesystem.getErasureCodingZoneForPath(src);
-             } catch (IOException e) {
-               blockLog
-                   .warn("Failed to get the EC zone for the file {} ", src);
-             }
-             if (ecZone == null) {
-               blockLog.warn("No erasure coding policy found for the file {}. "
-                   + "So cannot proceed for recovery", src);
-               // TODO: we may have to revisit later for what we can do better to
-               // handle this case.
-               continue;
-             }
-             rw.targets[0].getDatanodeDescriptor().addBlockToBeErasureCoded(
-                 new ExtendedBlock(namesystem.getBlockPoolId(), block),
-                 rw.srcNodes, rw.targets,
-                 ((ErasureCodingWork) rw).liveBlockIndicies,
-                 ecZone.getErasureCodingPolicy());
-           } else {
-             rw.srcNodes[0].addBlockToBeReplicated(block, targets);
-           }
-           scheduledWork++;
-           DatanodeStorageInfo.incrementBlocksScheduled(targets);
- 
-           // Move the block-replication into a "pending" state.
-           // The reason we use 'pending' is so we can retry
-           // replications that fail after an appropriate amount of time.
-           pendingReplications.increment(block,
-               DatanodeStorageInfo.toDatanodeDescriptors(targets));
-           blockLog.debug("BLOCK* block {} is moved from neededReplications to "
-                   + "pendingReplications", block);
- 
-           // remove from neededReplications
-           if(numEffectiveReplicas + targets.length >= requiredReplication) {
-             neededReplications.remove(block, priority); // remove from neededReplications
 -          if (validateReplicationWork(rw)) {
++          if (validateRecoveryWork(rw)) {
+             scheduledWork++;
            }
          }
        }
@@@ -1637,16 -1375,16 +1473,16 @@@
  
      if (blockLog.isInfoEnabled()) {
        // log which blocks have been scheduled for replication
 -      for(ReplicationWork rw : work){
 +      for(BlockRecoveryWork rw : recovWork){
-         DatanodeStorageInfo[] targets = rw.targets;
+         DatanodeStorageInfo[] targets = rw.getTargets();
          if (targets != null && targets.length != 0) {
            StringBuilder targetList = new StringBuilder("datanode(s)");
            for (DatanodeStorageInfo target : targets) {
              targetList.append(' ');
              targetList.append(target.getDatanodeDescriptor());
            }
-           blockLog.debug("BLOCK* ask {} to replicate {} to {}", rw.srcNodes,
-               rw.block, targetList);
 -          blockLog.debug("BLOCK* ask {} to replicate {} to {}", rw.getSrcNode(),
++          blockLog.debug("BLOCK* ask {} to replicate {} to {}", rw.getSrcNodes(),
+               rw.getBlock(), targetList);
          }
        }
      }
@@@ -1658,6 -1396,118 +1494,160 @@@
      return scheduledWork;
    }
  
+   boolean hasEnoughEffectiveReplicas(BlockInfo block,
+       NumberReplicas numReplicas, int pendingReplicaNum, int required) {
+     int numEffectiveReplicas = numReplicas.liveReplicas() + pendingReplicaNum;
+     return (numEffectiveReplicas >= required) &&
 -        (pendingReplicaNum > 0 || blockHasEnoughRacks(block));
++        (pendingReplicaNum > 0 || blockHasEnoughRacks(block, required));
+   }
+ 
 -  private ReplicationWork scheduleReplication(BlockInfo block, int priority) {
++  private BlockRecoveryWork scheduleRecovery(BlockInfo block, int priority) {
+     // block should belong to a file
+     BlockCollection bc = getBlockCollection(block);
+     // abandoned block or block reopened for append
+     if (bc == null
+         || (bc.isUnderConstruction() && block.equals(bc.getLastBlock()))) {
+       // remove from neededReplications
+       neededReplications.remove(block, priority);
+       return null;
+     }
+ 
+     short requiredReplication = getExpectedReplicaNum(block);
+ 
+     // get a source data-node
+     List<DatanodeDescriptor> containingNodes = new ArrayList<>();
+     List<DatanodeStorageInfo> liveReplicaNodes = new ArrayList<>();
+     NumberReplicas numReplicas = new NumberReplicas();
 -    DatanodeDescriptor srcNode = chooseSourceDatanode(block, containingNodes,
 -        liveReplicaNodes, numReplicas, priority);
 -    if (srcNode == null) { // block can not be replicated from any node
 -      LOG.debug("Block " + block + " cannot be repl from any node");
++    List<Short> liveBlockIndices = new ArrayList<>();
++    final DatanodeDescriptor[] srcNodes = chooseSourceDatanodes(block,
++        containingNodes, liveReplicaNodes, numReplicas,
++        liveBlockIndices, priority);
++    if(srcNodes == null || srcNodes.length == 0) {
++      // block can not be recovered from any node
++      LOG.debug("Block " + block + " cannot be recovered " +
++          "from any node");
+       return null;
+     }
+ 
+     // liveReplicaNodes can include READ_ONLY_SHARED replicas which are
+     // not included in the numReplicas.liveReplicas() count
+     assert liveReplicaNodes.size() >= numReplicas.liveReplicas();
+ 
+     int pendingNum = pendingReplications.getNumReplicas(block);
+     if (hasEnoughEffectiveReplicas(block, numReplicas, pendingNum,
+         requiredReplication)) {
+       neededReplications.remove(block, priority);
+       blockLog.debug("BLOCK* Removing {} from neededReplications as" +
+           " it has enough replicas", block);
+       return null;
+     }
+ 
+     final int additionalReplRequired;
+     if (numReplicas.liveReplicas() < requiredReplication) {
+       additionalReplRequired = requiredReplication - numReplicas.liveReplicas()
+           - pendingNum;
+     } else {
+       additionalReplRequired = 1; // Needed on a new rack
+     }
 -    return new ReplicationWork(block, bc, srcNode, containingNodes,
 -        liveReplicaNodes, additionalReplRequired, priority);
++
++    if (block.isStriped()) {
++      short[] indices = new short[liveBlockIndices.size()];
++      for (int i = 0 ; i < liveBlockIndices.size(); i++) {
++        indices[i] = liveBlockIndices.get(i);
++      }
++      return new ErasureCodingWork(block, bc, srcNodes,
++          containingNodes, liveReplicaNodes, additionalReplRequired,
++          priority, indices);
++    } else {
++      return new ReplicationWork(block, bc, srcNodes,
++          containingNodes, liveReplicaNodes, additionalReplRequired,
++          priority);
++    }
+   }
+ 
 -  private boolean validateReplicationWork(ReplicationWork rw) {
++  private boolean validateRecoveryWork(BlockRecoveryWork rw) {
+     BlockInfo block = rw.getBlock();
+     int priority = rw.getPriority();
+     // Recheck since global lock was released
+     // block should belong to a file
+     BlockCollection bc = getBlockCollection(block);
+     // abandoned block or block reopened for append
+     if (bc == null
+         || (bc.isUnderConstruction() && block.equals(bc.getLastBlock()))) {
+       neededReplications.remove(block, priority);
+       rw.resetTargets();
+       return false;
+     }
+ 
+     // do not schedule more if enough replicas is already pending
+     final short requiredReplication = getExpectedReplicaNum(block);
+     NumberReplicas numReplicas = countNodes(block);
+     final int pendingNum = pendingReplications.getNumReplicas(block);
+     if (hasEnoughEffectiveReplicas(block, numReplicas, pendingNum,
+         requiredReplication)) {
+       neededReplications.remove(block, priority);
+       rw.resetTargets();
+       blockLog.debug("BLOCK* Removing {} from neededReplications as" +
+           " it has enough replicas", block);
+       return false;
+     }
+ 
+     DatanodeStorageInfo[] targets = rw.getTargets();
+     if ( (numReplicas.liveReplicas() >= requiredReplication) &&
 -        (!blockHasEnoughRacks(block)) ) {
 -      if (rw.getSrcNode().getNetworkLocation().equals(
++        (!blockHasEnoughRacks(block, requiredReplication)) ) {
++      if (rw.getSrcNodes()[0].getNetworkLocation().equals(
+           targets[0].getDatanodeDescriptor().getNetworkLocation())) {
+         //No use continuing, unless a new rack in this case
+         return false;
+       }
+     }
+ 
 -    // Add block to the to be replicated list
 -    rw.getSrcNode().addBlockToBeReplicated(block, targets);
++    // Add block to the to be recovered list
++    if (block.isStriped()) {
++      assert rw instanceof ErasureCodingWork;
++      assert rw.getTargets().length > 0;
++      String src = getBlockCollection(block).getName();
++      ErasureCodingZone ecZone = null;
++      try {
++        ecZone = namesystem.getErasureCodingZoneForPath(src);
++      } catch (IOException e) {
++        blockLog
++            .warn("Failed to get the EC zone for the file {} ", src);
++      }
++      if (ecZone == null) {
++        blockLog.warn("No erasure coding policy found for the file {}. "
++            + "So cannot proceed for recovery", src);
++        // TODO: we may have to revisit later for what we can do better to
++        // handle this case.
++        return false;
++      }
++      rw.getTargets()[0].getDatanodeDescriptor().addBlockToBeErasureCoded(
++          new ExtendedBlock(namesystem.getBlockPoolId(), block),
++          rw.getSrcNodes(), rw.getTargets(),
++          ((ErasureCodingWork) rw).getLiveBlockIndicies(),
++          ecZone.getErasureCodingPolicy());
++    } else {
++      rw.getSrcNodes()[0].addBlockToBeReplicated(block, targets);
++    }
++
+     DatanodeStorageInfo.incrementBlocksScheduled(targets);
+ 
+     // Move the block-replication into a "pending" state.
+     // The reason we use 'pending' is so we can retry
+     // replications that fail after an appropriate amount of time.
+     pendingReplications.increment(block,
+         DatanodeStorageInfo.toDatanodeDescriptors(targets));
+     blockLog.debug("BLOCK* block {} is moved from neededReplications to "
+         + "pendingReplications", block);
+ 
+     int numEffectiveReplicas = numReplicas.liveReplicas() + pendingNum;
+     // remove from neededReplications
+     if(numEffectiveReplicas + targets.length >= requiredReplication) {
+       neededReplications.remove(block, priority);
+     }
+     return true;
+   }
+ 
    /** Choose target for WebHDFS redirection. */
    public DatanodeStorageInfo[] chooseTarget4WebHDFS(String src,
        DatanodeDescriptor clientnode, Set<Node> excludes, long blocksize) {
@@@ -1916,59 -1755,7 +1906,17 @@@
      }
    }
  
 +  private static class BlockInfoToAdd {
 +    final BlockInfo stored;
 +    final Block reported;
 +
 +    BlockInfoToAdd(BlockInfo stored, Block reported) {
 +      this.stored = stored;
 +      this.reported = reported;
 +    }
 +  }
 +
    /**
-    * BlockToMarkCorrupt is used to build the "toCorrupt" list, which is a
-    * list of blocks that should be considered corrupt due to a block report.
-    */
-   private static class BlockToMarkCorrupt {
-     /**
-      * The corrupted block in a datanode. This is the one reported by the
-      * datanode.
-      */
-     final Block corrupted;
-     /** The corresponding block stored in the BlockManager. */
-     final BlockInfo stored;
-     /** The reason to mark corrupt. */
-     final String reason;
-     /** The reason code to be stored */
-     final Reason reasonCode;
- 
-     BlockToMarkCorrupt(Block corrupted, BlockInfo stored, String reason,
-         Reason reasonCode) {
-       Preconditions.checkNotNull(corrupted, "corrupted is null");
-       Preconditions.checkNotNull(stored, "stored is null");
- 
-       this.corrupted = corrupted;
-       this.stored = stored;
-       this.reason = reason;
-       this.reasonCode = reasonCode;
-     }
- 
-     BlockToMarkCorrupt(Block corrupted, BlockInfo stored, long gs,
-         String reason, Reason reasonCode) {
-       this(corrupted, stored, reason, reasonCode);
-       //the corrupted block in datanode has a different generation stamp
-       corrupted.setGenerationStamp(gs);
-     }
- 
-     @Override
-     public String toString() {
-       return corrupted + "("
-           + (corrupted == stored? "same as stored": "stored=" + stored) + ")";
-     }
-   }
- 
-   /**
     * The given storage is reporting all its blocks.
     * Update the (storage-->block list) and (block-->storage list) maps.
     *
@@@ -2721,8 -2484,8 +2669,8 @@@
      // Now check for completion of blocks and safe block count
      int numCurrentReplica = countLiveNodes(storedBlock);
      if (storedBlock.getBlockUCState() == BlockUCState.COMMITTED
 -        && numCurrentReplica >= minReplication) {
 +        && hasMinStorage(storedBlock, numCurrentReplica)) {
-       completeBlock(getBlockCollection(storedBlock), storedBlock, false);
+       completeBlock(storedBlock, false);
      } else if (storedBlock.isComplete() && result == AddBlockResult.ADDED) {
        // check whether safe replication is reached for the block
        // only complete blocks are counted towards that.
@@@ -2796,8 -2558,8 +2744,8 @@@
        + pendingReplications.getNumReplicas(storedBlock);
  
      if(storedBlock.getBlockUCState() == BlockUCState.COMMITTED &&
 -        numLiveReplicas >= minReplication) {
 +        hasMinStorage(storedBlock, numLiveReplicas)) {
-       storedBlock = completeBlock(bc, storedBlock, false);
+       completeBlock(storedBlock, false);
      } else if (storedBlock.isComplete() && result == AddBlockResult.ADDED) {
        // check whether safe replication is reached for the block
        // only complete blocks are counted towards that
@@@ -4171,138 -3740,7 +4119,32 @@@
          null);
    }
  
 +  public static LocatedStripedBlock newLocatedStripedBlock(
 +      ExtendedBlock b, DatanodeStorageInfo[] storages,
 +      int[] indices, long startOffset, boolean corrupt) {
 +    // startOffset is unknown
 +    return new LocatedStripedBlock(
 +        b, DatanodeStorageInfo.toDatanodeInfos(storages),
 +        DatanodeStorageInfo.toStorageIDs(storages),
 +        DatanodeStorageInfo.toStorageTypes(storages),
 +        indices, startOffset, corrupt,
 +        null);
 +  }
 +
 +  public static LocatedBlock newLocatedBlock(ExtendedBlock eb, BlockInfo info,
 +      DatanodeStorageInfo[] locs, long offset) throws IOException {
 +    final LocatedBlock lb;
 +    if (info.isStriped()) {
 +      lb = newLocatedStripedBlock(eb, locs,
 +          info.getUnderConstructionFeature().getBlockIndices(),
 +          offset, false);
 +    } else {
 +      lb = newLocatedBlock(eb, locs, offset, false);
 +    }
 +    return lb;
 +  }
 +
    /**
-    * This class is used internally by {@link this#computeRecoveryWorkForBlocks}
-    * to represent a task to recover a block through replication or erasure
-    * coding. Recovery is done by transferring data from srcNodes to targets
-    */
-   private abstract static class BlockRecoveryWork {
-     final BlockInfo block;
-     final BlockCollection bc;
- 
-     /**
-      * An erasure coding recovery task has multiple source nodes.
-      * A replication task only has 1 source node, stored on top of the array
-      */
-     final DatanodeDescriptor[] srcNodes;
-     /** Nodes containing the block; avoid them in choosing new targets */
-     final List<DatanodeDescriptor> containingNodes;
-     /** Required by {@link BlockPlacementPolicy#chooseTarget} */
-     final List<DatanodeStorageInfo> liveReplicaStorages;
-     final int additionalReplRequired;
- 
-     DatanodeStorageInfo[] targets;
-     final int priority;
- 
-     BlockRecoveryWork(BlockInfo block,
-         BlockCollection bc,
-         DatanodeDescriptor[] srcNodes,
-         List<DatanodeDescriptor> containingNodes,
-         List<DatanodeStorageInfo> liveReplicaStorages,
-         int additionalReplRequired,
-         int priority) {
-       this.block = block;
-       this.bc = bc;
-       this.srcNodes = srcNodes;
-       this.containingNodes = containingNodes;
-       this.liveReplicaStorages = liveReplicaStorages;
-       this.additionalReplRequired = additionalReplRequired;
-       this.priority = priority;
-       this.targets = null;
-     }
- 
-     abstract void chooseTargets(BlockPlacementPolicy blockplacement,
-         BlockStoragePolicySuite storagePolicySuite,
-         Set<Node> excludedNodes);
-   }
- 
-   private static class ReplicationWork extends BlockRecoveryWork {
-     ReplicationWork(BlockInfo block,
-         BlockCollection bc,
-         DatanodeDescriptor[] srcNodes,
-         List<DatanodeDescriptor> containingNodes,
-         List<DatanodeStorageInfo> liveReplicaStorages,
-         int additionalReplRequired,
-         int priority) {
-       super(block, bc, srcNodes, containingNodes,
-           liveReplicaStorages, additionalReplRequired, priority);
-       LOG.debug("Creating a ReplicationWork to recover " + block);
-     }
- 
-     @Override
-     void chooseTargets(BlockPlacementPolicy blockplacement,
-         BlockStoragePolicySuite storagePolicySuite,
-         Set<Node> excludedNodes) {
-       assert srcNodes.length > 0
-           : "At least 1 source node should have been selected";
-       try {
-         targets = blockplacement.chooseTarget(bc.getName(),
-             additionalReplRequired, srcNodes[0], liveReplicaStorages, false,
-             excludedNodes, block.getNumBytes(),
-             storagePolicySuite.getPolicy(bc.getStoragePolicyID()));
-       } finally {
-         srcNodes[0].decrementPendingReplicationWithoutTargets();
-       }
-     }
-   }
- 
-   private static class ErasureCodingWork extends BlockRecoveryWork {
-     final short[] liveBlockIndicies;
- 
-     ErasureCodingWork(BlockInfo block,
-         BlockCollection bc,
-         DatanodeDescriptor[] srcNodes,
-         List<DatanodeDescriptor> containingNodes,
-         List<DatanodeStorageInfo> liveReplicaStorages,
-         int additionalReplRequired,
-         int priority, short[] liveBlockIndicies) {
-       super(block, bc, srcNodes, containingNodes,
-           liveReplicaStorages, additionalReplRequired, priority);
-       this.liveBlockIndicies = liveBlockIndicies;
-       LOG.debug("Creating an ErasureCodingWork to recover " + block);
-     }
- 
-     @Override
-     void chooseTargets(BlockPlacementPolicy blockplacement,
-         BlockStoragePolicySuite storagePolicySuite,
-         Set<Node> excludedNodes) {
-       try {
-         // TODO: new placement policy for EC considering multiple writers
-         targets = blockplacement.chooseTarget(bc.getName(),
-             additionalReplRequired, srcNodes[0], liveReplicaStorages, false,
-             excludedNodes, block.getNumBytes(),
-             storagePolicySuite.getPolicy(bc.getStoragePolicyID()));
-       } finally {
-       }
-     }
-   }
- 
-   /**
     * A simple result enum for the result of
     * {@link BlockManager#processMisReplicatedBlock(BlockInfo)}.
     */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockRecoveryWork.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockRecoveryWork.java
index 0000000,0000000..ed546df
new file mode 100644
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockRecoveryWork.java
@@@ -1,0 -1,0 +1,111 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.hadoop.hdfs.server.blockmanagement;
++
++import org.apache.hadoop.net.Node;
++
++import java.util.Collections;
++import java.util.List;
++import java.util.Set;
++
++/**
++ * This class is used internally by
++ * {@link BlockManager#computeRecoveryWorkForBlocks} to represent a task to
++ * recover a block through replication or erasure coding. Recovery is done by
++ * transferring data from srcNodes to targets
++ */
++abstract class BlockRecoveryWork {
++  private final BlockInfo block;
++
++  private final BlockCollection bc;
++
++  /**
++   * An erasure coding recovery task has multiple source nodes.
++   * A replication task only has 1 source node, stored on top of the array
++   */
++  private final DatanodeDescriptor[] srcNodes;
++  /** Nodes containing the block; avoid them in choosing new targets */
++  private final List<DatanodeDescriptor> containingNodes;
++  /** Required by {@link BlockPlacementPolicy#chooseTarget} */
++  private  final List<DatanodeStorageInfo> liveReplicaStorages;
++  private final int additionalReplRequired;
++
++  private DatanodeStorageInfo[] targets;
++  private final int priority;
++
++  public BlockRecoveryWork(BlockInfo block,
++      BlockCollection bc,
++      DatanodeDescriptor[] srcNodes,
++      List<DatanodeDescriptor> containingNodes,
++      List<DatanodeStorageInfo> liveReplicaStorages,
++      int additionalReplRequired,
++      int priority) {
++    this.block = block;
++    this.bc = bc;
++    this.srcNodes = srcNodes;
++    this.containingNodes = containingNodes;
++    this.liveReplicaStorages = liveReplicaStorages;
++    this.additionalReplRequired = additionalReplRequired;
++    this.priority = priority;
++    this.targets = null;
++  }
++
++  DatanodeStorageInfo[] getTargets() {
++    return targets;
++  }
++
++  void resetTargets() {
++    this.targets = null;
++  }
++
++  void setTargets(DatanodeStorageInfo[] targets) {
++    this.targets = targets;
++  }
++
++  List<DatanodeDescriptor> getContainingNodes() {
++    return Collections.unmodifiableList(containingNodes);
++  }
++
++  public int getPriority() {
++    return priority;
++  }
++
++  public BlockInfo getBlock() {
++    return block;
++  }
++
++  public DatanodeDescriptor[] getSrcNodes() {
++    return srcNodes;
++  }
++
++  BlockCollection getBc() {
++    return bc;
++  }
++
++  List<DatanodeStorageInfo> getLiveReplicaStorages() {
++    return liveReplicaStorages;
++  }
++
++  public int getAdditionalReplRequired() {
++    return additionalReplRequired;
++  }
++
++  abstract void chooseTargets(BlockPlacementPolicy blockplacement,
++      BlockStoragePolicySuite storagePolicySuite,
++      Set<Node> excludedNodes);
++}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockToMarkCorrupt.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockToMarkCorrupt.java
index 0000000,3842e56..a871390
mode 000000,100644..100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockToMarkCorrupt.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockToMarkCorrupt.java
@@@ -1,0 -1,87 +1,82 @@@
+ /**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.hadoop.hdfs.server.blockmanagement;
+ 
+ import static org.apache.hadoop.hdfs.server.blockmanagement.CorruptReplicasMap.Reason;
+ 
+ import com.google.common.base.Preconditions;
++import org.apache.hadoop.hdfs.protocol.Block;
+ 
+ /**
+  * BlockToMarkCorrupt is used to build the "toCorrupt" list, which is a
+  * list of blocks that should be considered corrupt due to a block report.
+  */
+ class BlockToMarkCorrupt {
+   /** The corrupted block in a datanode. */
 -  private final BlockInfo corrupted;
++  private final Block corrupted;
+   /** The corresponding block stored in the BlockManager. */
+   private final BlockInfo stored;
+   /** The reason to mark corrupt. */
+   private final String reason;
+   /** The reason code to be stored */
+   private final CorruptReplicasMap.Reason reasonCode;
+ 
 -  BlockToMarkCorrupt(BlockInfo corrupted, BlockInfo stored, String reason,
++  BlockToMarkCorrupt(Block corrupted, BlockInfo stored, String reason,
+       CorruptReplicasMap.Reason reasonCode) {
+     Preconditions.checkNotNull(corrupted, "corrupted is null");
+     Preconditions.checkNotNull(stored, "stored is null");
+ 
+     this.corrupted = corrupted;
+     this.stored = stored;
+     this.reason = reason;
+     this.reasonCode = reasonCode;
+   }
+ 
 -  BlockToMarkCorrupt(BlockInfo stored, String reason,
++  BlockToMarkCorrupt(Block corrupted, BlockInfo stored, long gs, String reason,
+       CorruptReplicasMap.Reason reasonCode) {
 -    this(stored, stored, reason, reasonCode);
 -  }
 -
 -  BlockToMarkCorrupt(BlockInfo stored, long gs, String reason,
 -      CorruptReplicasMap.Reason reasonCode) {
 -    this(new BlockInfoContiguous((BlockInfoContiguous)stored), stored,
 -        reason, reasonCode);
++    this(corrupted, stored, reason, reasonCode);
+     //the corrupted block in datanode has a different generation stamp
 -    corrupted.setGenerationStamp(gs);
++    this.corrupted.setGenerationStamp(gs);
+   }
+ 
+   public boolean isCorruptedDuringWrite() {
+     return stored.getGenerationStamp() > corrupted.getGenerationStamp();
+   }
+ 
 -  public BlockInfo getCorrupted() {
++  public Block getCorrupted() {
+     return corrupted;
+   }
+ 
+   public BlockInfo getStored() {
+     return stored;
+   }
+ 
+   public String getReason() {
+     return reason;
+   }
+ 
+   public Reason getReasonCode() {
+     return reasonCode;
+   }
+ 
+   @Override
+   public String toString() {
+     return corrupted + "("
+         + (corrupted == stored ? "same as stored": "stored=" + stored) + ")";
+   }
+ }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
index 29e541c,0b398c5..b258f06
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
@@@ -38,9 -39,8 +39,10 @@@ import org.apache.hadoop.fs.StorageType
  import org.apache.hadoop.hdfs.protocol.Block;
  import org.apache.hadoop.hdfs.protocol.DatanodeID;
  import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 +import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
  import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
 +import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo;
  import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
  import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
  import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State;
@@@ -696,29 -663,32 +698,39 @@@ public class DatanodeDescriptor extend
      }
    }
  
 +  @VisibleForTesting
 +  public boolean containsInvalidateBlock(Block block) {
 +    synchronized (invalidateBlocks) {
 +      return invalidateBlocks.contains(block);
 +    }
 +  }
 +
    /**
-    * Return the sum of remaining spaces of the specified type. If the remaining
-    * space of a storage is less than minSize, it won't be counted toward the
-    * sum.
+    * Find whether the datanode contains good storage of given type to
+    * place block of size <code>blockSize</code>.
+    *
+    * <p>Currently datanode only cares about the storage type, in this
+    * method, the first storage of given type we see is returned.
     *
-    * @param t The storage type. If null, the type is ignored.
-    * @param minSize The minimum free space required.
-    * @return the sum of remaining spaces that are bigger than minSize.
+    * @param t requested storage type
+    * @param blockSize requested block size
+    * @return
     */
-   public long getRemaining(StorageType t, long minSize) {
+   public DatanodeStorageInfo chooseStorage4Block(StorageType t,
+       long blockSize) {
+     final long requiredSize =
+         blockSize * HdfsServerConstants.MIN_BLOCKS_FOR_WRITE;
+     final long scheduledSize = blockSize * getBlocksScheduled(t);
      long remaining = 0;
+     DatanodeStorageInfo storage = null;
      for (DatanodeStorageInfo s : getStorageInfos()) {
        if (s.getState() == State.NORMAL &&
-           (t == null || s.getStorageType() == t)) {
+           s.getStorageType() == t) {
+         if (storage == null) {
+           storage = s;
+         }
          long r = s.getRemaining();
-         if (r >= minSize) {
+         if (r >= requiredSize) {
            remaining += r;
          }
        }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java
index 0000000,0000000..761d6d0
new file mode 100644
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java
@@@ -1,0 -1,0 +1,60 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.hadoop.hdfs.server.blockmanagement;
++
++import org.apache.hadoop.net.Node;
++
++import java.util.List;
++import java.util.Set;
++
++class ErasureCodingWork extends BlockRecoveryWork {
++  private final short[] liveBlockIndicies;
++
++  public ErasureCodingWork(BlockInfo block,
++      BlockCollection bc,
++      DatanodeDescriptor[] srcNodes,
++      List<DatanodeDescriptor> containingNodes,
++      List<DatanodeStorageInfo> liveReplicaStorages,
++      int additionalReplRequired,
++      int priority, short[] liveBlockIndicies) {
++    super(block, bc, srcNodes, containingNodes,
++        liveReplicaStorages, additionalReplRequired, priority);
++    this.liveBlockIndicies = liveBlockIndicies;
++    BlockManager.LOG.debug("Creating an ErasureCodingWork to recover " + block);
++  }
++
++  short[] getLiveBlockIndicies() {
++    return liveBlockIndicies;
++  }
++
++  @Override
++  void chooseTargets(BlockPlacementPolicy blockplacement,
++      BlockStoragePolicySuite storagePolicySuite,
++      Set<Node> excludedNodes) {
++    try {
++      // TODO: new placement policy for EC considering multiple writers
++      DatanodeStorageInfo[] chosenTargets = blockplacement.chooseTarget(
++          getBc().getName(), getAdditionalReplRequired(), getSrcNodes()[0],
++          getLiveReplicaStorages(), false, excludedNodes,
++          getBlock().getNumBytes(),
++          storagePolicySuite.getPolicy(getBc().getStoragePolicyID()));
++      setTargets(chosenTargets);
++    } finally {
++    }
++  }
++}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReplicationWork.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReplicationWork.java
index 0000000,f8a6dad..8266f45
mode 000000,100644..100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReplicationWork.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReplicationWork.java
@@@ -1,0 -1,87 +1,53 @@@
+ /**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.hadoop.hdfs.server.blockmanagement;
+ 
+ import org.apache.hadoop.net.Node;
+ 
+ import java.util.Collections;
+ import java.util.List;
+ import java.util.Set;
+ 
 -class ReplicationWork {
 -  private final BlockInfo block;
 -  private final BlockCollection bc;
 -  private final DatanodeDescriptor srcNode;
 -  private final int additionalReplRequired;
 -  private final int priority;
 -  private final List<DatanodeDescriptor> containingNodes;
 -  private final List<DatanodeStorageInfo> liveReplicaStorages;
 -  private DatanodeStorageInfo[] targets;
 -
++class ReplicationWork extends BlockRecoveryWork {
+   public ReplicationWork(BlockInfo block, BlockCollection bc,
 -      DatanodeDescriptor srcNode, List<DatanodeDescriptor> containingNodes,
++      DatanodeDescriptor[] srcNodes, List<DatanodeDescriptor> containingNodes,
+       List<DatanodeStorageInfo> liveReplicaStorages, int additionalReplRequired,
+       int priority) {
 -    this.block = block;
 -    this.bc = bc;
 -    this.srcNode = srcNode;
 -    this.srcNode.incrementPendingReplicationWithoutTargets();
 -    this.containingNodes = containingNodes;
 -    this.liveReplicaStorages = liveReplicaStorages;
 -    this.additionalReplRequired = additionalReplRequired;
 -    this.priority = priority;
 -    this.targets = null;
++    super(block, bc, srcNodes, containingNodes,
++        liveReplicaStorages, additionalReplRequired, priority);
++    BlockManager.LOG.debug("Creating a ReplicationWork to recover " + block);
+   }
+ 
++  @Override
+   void chooseTargets(BlockPlacementPolicy blockplacement,
+       BlockStoragePolicySuite storagePolicySuite,
+       Set<Node> excludedNodes) {
++    assert getSrcNodes().length > 0
++        : "At least 1 source node should have been selected";
+     try {
 -      targets = blockplacement.chooseTarget(bc.getName(),
 -          additionalReplRequired, srcNode, liveReplicaStorages, false,
 -          excludedNodes, block.getNumBytes(),
 -          storagePolicySuite.getPolicy(bc.getStoragePolicyID()));
++      DatanodeStorageInfo[] chosenTargets = blockplacement.chooseTarget(
++          getBc().getName(), getAdditionalReplRequired(), getSrcNodes()[0],
++          getLiveReplicaStorages(), false, excludedNodes,
++          getBlock().getNumBytes(),
++          storagePolicySuite.getPolicy(getBc().getStoragePolicyID()));
++      setTargets(chosenTargets);
+     } finally {
 -      srcNode.decrementPendingReplicationWithoutTargets();
++      getSrcNodes()[0].decrementPendingReplicationWithoutTargets();
+     }
+   }
 -
 -  DatanodeStorageInfo[] getTargets() {
 -    return targets;
 -  }
 -
 -  void resetTargets() {
 -    this.targets = null;
 -  }
 -
 -  List<DatanodeDescriptor> getContainingNodes() {
 -    return Collections.unmodifiableList(containingNodes);
 -  }
 -
 -  public int getPriority() {
 -    return priority;
 -  }
 -
 -  public BlockInfo getBlock() {
 -    return block;
 -  }
 -
 -  public DatanodeDescriptor getSrcNode() {
 -    return srcNode;
 -  }
+ }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfoStriped.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfoStriped.java
index 3c77120,0000000..6cc1dcd
mode 100644,000000..100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfoStriped.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfoStriped.java
@@@ -1,250 -1,0 +1,220 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.hadoop.hdfs.server.blockmanagement;
 +
 +import org.apache.hadoop.hdfs.DFSTestUtil;
 +import org.apache.hadoop.hdfs.protocol.Block;
 +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo.AddBlockResult;
 +import org.apache.hadoop.hdfs.server.namenode.ErasureCodingPolicyManager;
 +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
 +import org.junit.Assert;
 +import org.junit.Test;
 +import org.mockito.internal.util.reflection.Whitebox;
 +
 +import java.io.DataOutput;
 +import java.io.DataOutputStream;
 +import java.io.ByteArrayOutputStream;
 +import java.nio.ByteBuffer;
 +
 +import static org.apache.hadoop.hdfs.protocol.HdfsConstants.NUM_DATA_BLOCKS;
 +import static org.apache.hadoop.hdfs.protocol.HdfsConstants.NUM_PARITY_BLOCKS;
 +import static org.junit.Assert.assertArrayEquals;
 +import static org.junit.Assert.assertEquals;
 +import static org.junit.Assert.fail;
 +
 +/**
 + * Test {@link BlockInfoStriped}
 + */
 +public class TestBlockInfoStriped {
 +  private static final int TOTAL_NUM_BLOCKS = NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS;
 +  private static final long BASE_ID = -1600;
 +  private static final Block baseBlock = new Block(BASE_ID);
 +  private static final ErasureCodingPolicy testECPolicy
 +      = ErasureCodingPolicyManager.getSystemDefaultPolicy();
 +  private final BlockInfoStriped info = new BlockInfoStriped(baseBlock,
 +      testECPolicy);
 +
 +  private Block[] createReportedBlocks(int num) {
 +    Block[] blocks = new Block[num];
 +    for (int i = 0; i < num; i++) {
 +      blocks[i] = new Block(BASE_ID + i);
 +    }
 +    return blocks;
 +  }
 +
 +  /**
 +   * Test adding storage and reported block
 +   */
 +  @Test
 +  public void testAddStorage() {
 +    // first add NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS storages, i.e., a complete
 +    // group of blocks/storages
 +    DatanodeStorageInfo[] storageInfos = DFSTestUtil.createDatanodeStorageInfos(
 +        TOTAL_NUM_BLOCKS);
 +    Block[] blocks = createReportedBlocks(TOTAL_NUM_BLOCKS);
 +    int i = 0;
 +    for (; i < storageInfos.length; i += 2) {
 +      info.addStorage(storageInfos[i], blocks[i]);
 +      Assert.assertEquals(i/2 + 1, info.numNodes());
 +    }
 +    i /= 2;
 +    for (int j = 1; j < storageInfos.length; j += 2) {
 +      Assert.assertTrue(info.addStorage(storageInfos[j], blocks[j]));
 +      Assert.assertEquals(i + (j+1)/2, info.numNodes());
 +    }
 +
 +    // check
 +    byte[] indices = (byte[]) Whitebox.getInternalState(info, "indices");
 +    Assert.assertEquals(TOTAL_NUM_BLOCKS, info.getCapacity());
 +    Assert.assertEquals(TOTAL_NUM_BLOCKS, indices.length);
 +    i = 0;
 +    for (DatanodeStorageInfo storage : storageInfos) {
 +      int index = info.findStorageInfo(storage);
 +      Assert.assertEquals(i++, index);
 +      Assert.assertEquals(index, indices[index]);
 +    }
 +
 +    // the same block is reported from the same storage twice
 +    i = 0;
 +    for (DatanodeStorageInfo storage : storageInfos) {
 +      Assert.assertTrue(info.addStorage(storage, blocks[i++]));
 +    }
 +    Assert.assertEquals(TOTAL_NUM_BLOCKS, info.getCapacity());
 +    Assert.assertEquals(TOTAL_NUM_BLOCKS, info.numNodes());
 +    Assert.assertEquals(TOTAL_NUM_BLOCKS, indices.length);
 +    i = 0;
 +    for (DatanodeStorageInfo storage : storageInfos) {
 +      int index = info.findStorageInfo(storage);
 +      Assert.assertEquals(i++, index);
 +      Assert.assertEquals(index, indices[index]);
 +    }
 +
 +    // the same block is reported from another storage
 +    DatanodeStorageInfo[] storageInfos2 = DFSTestUtil.createDatanodeStorageInfos(
 +        TOTAL_NUM_BLOCKS * 2);
 +    // only add the second half of info2
 +    for (i = TOTAL_NUM_BLOCKS; i < storageInfos2.length; i++) {
 +      info.addStorage(storageInfos2[i], blocks[i % TOTAL_NUM_BLOCKS]);
 +      Assert.assertEquals(i + 1, info.getCapacity());
 +      Assert.assertEquals(i + 1, info.numNodes());
 +      indices = (byte[]) Whitebox.getInternalState(info, "indices");
 +      Assert.assertEquals(i + 1, indices.length);
 +    }
 +    for (i = TOTAL_NUM_BLOCKS; i < storageInfos2.length; i++) {
 +      int index = info.findStorageInfo(storageInfos2[i]);
 +      Assert.assertEquals(i++, index);
 +      Assert.assertEquals(index - TOTAL_NUM_BLOCKS, indices[index]);
 +    }
 +  }
 +
 +  @Test
 +  public void testRemoveStorage() {
 +    // first add TOTAL_NUM_BLOCKS into the BlockInfoStriped
 +    DatanodeStorageInfo[] storages = DFSTestUtil.createDatanodeStorageInfos(
 +        TOTAL_NUM_BLOCKS);
 +    Block[] blocks = createReportedBlocks(TOTAL_NUM_BLOCKS);
 +    for (int i = 0; i < storages.length; i++) {
 +      info.addStorage(storages[i], blocks[i]);
 +    }
 +
 +    // remove two storages
 +    info.removeStorage(storages[0]);
 +    info.removeStorage(storages[2]);
 +
 +    // check
 +    Assert.assertEquals(TOTAL_NUM_BLOCKS, info.getCapacity());
 +    Assert.assertEquals(TOTAL_NUM_BLOCKS - 2, info.numNodes());
 +    byte[] indices = (byte[]) Whitebox.getInternalState(info, "indices");
 +    for (int i = 0; i < storages.length; i++) {
 +      int index = info.findStorageInfo(storages[i]);
 +      if (i != 0 && i != 2) {
 +        Assert.assertEquals(i, index);
 +        Assert.assertEquals(index, indices[index]);
 +      } else {
 +        Assert.assertEquals(-1, index);
 +        Assert.assertEquals(-1, indices[i]);
 +      }
 +    }
 +
 +    // the same block is reported from another storage
 +    DatanodeStorageInfo[] storages2 = DFSTestUtil.createDatanodeStorageInfos(
 +        TOTAL_NUM_BLOCKS * 2);
 +    for (int i = TOTAL_NUM_BLOCKS; i < storages2.length; i++) {
 +      info.addStorage(storages2[i], blocks[i % TOTAL_NUM_BLOCKS]);
 +    }
 +    // now we should have 8 storages
 +    Assert.assertEquals(TOTAL_NUM_BLOCKS * 2 - 2, info.numNodes());
 +    Assert.assertEquals(TOTAL_NUM_BLOCKS * 2 - 2, info.getCapacity());
 +    indices = (byte[]) Whitebox.getInternalState(info, "indices");
 +    Assert.assertEquals(TOTAL_NUM_BLOCKS * 2 - 2, indices.length);
 +    int j = TOTAL_NUM_BLOCKS;
 +    for (int i = TOTAL_NUM_BLOCKS; i < storages2.length; i++) {
 +      int index = info.findStorageInfo(storages2[i]);
 +      if (i == TOTAL_NUM_BLOCKS || i == TOTAL_NUM_BLOCKS + 2) {
 +        Assert.assertEquals(i - TOTAL_NUM_BLOCKS, index);
 +      } else {
 +        Assert.assertEquals(j++, index);
 +      }
 +    }
 +
 +    // remove the storages from storages2
 +    for (int i = 0; i < TOTAL_NUM_BLOCKS; i++) {
 +      info.removeStorage(storages2[i + TOTAL_NUM_BLOCKS]);
 +    }
 +    // now we should have 3 storages
 +    Assert.assertEquals(TOTAL_NUM_BLOCKS - 2, info.numNodes());
 +    Assert.assertEquals(TOTAL_NUM_BLOCKS * 2 - 2, info.getCapacity());
 +    indices = (byte[]) Whitebox.getInternalState(info, "indices");
 +    Assert.assertEquals(TOTAL_NUM_BLOCKS * 2 - 2, indices.length);
 +    for (int i = 0; i < TOTAL_NUM_BLOCKS; i++) {
 +      if (i == 0 || i == 2) {
 +        int index = info.findStorageInfo(storages2[i + TOTAL_NUM_BLOCKS]);
 +        Assert.assertEquals(-1, index);
 +      } else {
 +        int index = info.findStorageInfo(storages[i]);
 +        Assert.assertEquals(i, index);
 +      }
 +    }
 +    for (int i = TOTAL_NUM_BLOCKS; i < TOTAL_NUM_BLOCKS * 2 - 2; i++) {
 +      Assert.assertEquals(-1, indices[i]);
 +      Assert.assertNull(info.getDatanode(i));
 +    }
 +  }
 +
 +  @Test
-   public void testReplaceBlock() {
-     DatanodeStorageInfo[] storages = DFSTestUtil.createDatanodeStorageInfos(
-         TOTAL_NUM_BLOCKS);
-     Block[] blocks = createReportedBlocks(TOTAL_NUM_BLOCKS);
-     // add block/storage 0, 2, 4 into the BlockInfoStriped
-     for (int i = 0; i < storages.length; i += 2) {
-       Assert.assertEquals(AddBlockResult.ADDED,
-           storages[i].addBlock(info, blocks[i]));
-     }
- 
-     BlockInfoStriped newBlockInfo = new BlockInfoStriped(info,
-         info.getErasureCodingPolicy());
-     newBlockInfo.setBlockCollectionId(info.getBlockCollectionId());
-     info.replaceBlock(newBlockInfo);
- 
-     // make sure the newBlockInfo is correct
-     byte[] indices = (byte[]) Whitebox.getInternalState(newBlockInfo, "indices");
-     for (int i = 0; i < storages.length; i += 2) {
-       int index = newBlockInfo.findStorageInfo(storages[i]);
-       Assert.assertEquals(i, index);
-       Assert.assertEquals(index, indices[i]);
- 
-       // make sure the newBlockInfo is added to the linked list of the storage
-       Assert.assertSame(newBlockInfo, storages[i].getBlockListHeadForTesting());
-       Assert.assertEquals(1, storages[i].numBlocks());
-       Assert.assertNull(newBlockInfo.getNext());
-     }
-   }
- 
-   @Test
 +  public void testWrite() {
 +    long blkID = 1;
 +    long numBytes = 1;
 +    long generationStamp = 1;
 +    ByteBuffer byteBuffer = ByteBuffer.allocate(Long.SIZE / Byte.SIZE * 3);
 +    byteBuffer.putLong(blkID).putLong(numBytes).putLong(generationStamp);
 +
 +    ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
 +    DataOutput out = new DataOutputStream(byteStream);
 +    BlockInfoStriped blk = new BlockInfoStriped(new Block(blkID, numBytes,
 +        generationStamp), testECPolicy);
 +
 +    try {
 +      blk.write(out);
 +    } catch(Exception ex) {
 +      fail("testWrite error:" + ex.getMessage());
 +    }
 +    assertEquals(byteBuffer.array().length, byteStream.toByteArray().length);
 +    assertArrayEquals(byteBuffer.array(), byteStream.toByteArray());
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
----------------------------------------------------------------------