You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vv...@apache.org on 2015/07/01 12:45:39 UTC

[08/50] hadoop git commit: HDFS-8623. Refactor NameNode handling of invalid, corrupt, and under-recovery blocks. Contributed by Zhe Zhang.

HDFS-8623. Refactor NameNode handling of invalid, corrupt, and under-recovery blocks. Contributed by Zhe Zhang.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/de480d6c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/de480d6c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/de480d6c

Branch: refs/heads/YARN-2139
Commit: de480d6c8945bd8b5b00e8657b7a72ce8dd9b6b5
Parents: 1b764a0
Author: Jing Zhao <ji...@apache.org>
Authored: Fri Jun 26 10:49:01 2015 -0700
Committer: Jing Zhao <ji...@apache.org>
Committed: Fri Jun 26 10:49:01 2015 -0700

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 .../hdfs/server/blockmanagement/BlockInfo.java  |  12 +-
 .../blockmanagement/BlockInfoContiguous.java    |   2 +-
 .../BlockInfoUnderConstructionContiguous.java   |   2 +-
 .../server/blockmanagement/BlockManager.java    | 595 ++++++++++---------
 .../blockmanagement/DatanodeStorageInfo.java    |  15 +-
 .../hdfs/server/namenode/FSNamesystem.java      |  18 +-
 .../hdfs/server/namenode/NamenodeFsck.java      |   2 +-
 .../server/blockmanagement/TestBlockInfo.java   |   2 +-
 .../blockmanagement/TestBlockManager.java       |   4 +-
 .../server/blockmanagement/TestNodeCount.java   |   2 +-
 .../TestOverReplicatedBlocks.java               |   4 +-
 .../blockmanagement/TestReplicationPolicy.java  |   2 +-
 13 files changed, 361 insertions(+), 302 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/de480d6c/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 27e2e89..bb1b3ff 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -679,6 +679,9 @@ Release 2.8.0 - UNRELEASED
     HDFS-8651. Make hadoop-hdfs-project Native code -Wall-clean (Alan Burlison
     via Colin P. McCabe)
 
+    HDFS-8623. Refactor NameNode handling of invalid, corrupt, and under-recovery
+    blocks. (Zhe Zhang via jing9)
+
   OPTIMIZATIONS
 
     HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de480d6c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
index 4cc2791..5ad992b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
@@ -172,19 +172,23 @@ public abstract class  BlockInfo extends Block
   public abstract int numNodes();
 
   /**
-   * Add a {@link DatanodeStorageInfo} location for a block.
+   * Add a {@link DatanodeStorageInfo} location for a block
+   * @param storage The storage to add
+   * @param reportedBlock The block reported from the datanode. This is only
+   *                      used by erasure coded blocks, this block's id contains
+   *                      information indicating the index of the block in the
+   *                      corresponding block group.
    */
-  abstract boolean addStorage(DatanodeStorageInfo storage);
+  abstract boolean addStorage(DatanodeStorageInfo storage, Block reportedBlock);
 
   /**
    * Remove {@link DatanodeStorageInfo} location for a block
    */
   abstract boolean removeStorage(DatanodeStorageInfo storage);
 
-
   /**
    * Replace the current BlockInfo with the new one in corresponding
-   * DatanodeStorageInfo's linked list
+   * DatanodeStorageInfo's linked list.
    */
   abstract void replaceBlock(BlockInfo newBlock);
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de480d6c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java
index b9abcd0..de64ad8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java
@@ -45,7 +45,7 @@ public class BlockInfoContiguous extends BlockInfo {
   }
 
   @Override
-  boolean addStorage(DatanodeStorageInfo storage) {
+  boolean addStorage(DatanodeStorageInfo storage, Block reportedBlock) {
     return ContiguousBlockStorageOp.addStorage(this, storage);
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de480d6c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstructionContiguous.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstructionContiguous.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstructionContiguous.java
index c66675a..d3cb337 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstructionContiguous.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstructionContiguous.java
@@ -69,7 +69,7 @@ public class BlockInfoUnderConstructionContiguous extends
   }
 
   @Override
-  boolean addStorage(DatanodeStorageInfo storage) {
+  boolean addStorage(DatanodeStorageInfo storage, Block reportedBlock) {
     return ContiguousBlockStorageOp.addStorage(this, storage);
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de480d6c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 368d3b0..5bd4980 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -198,8 +198,8 @@ public class BlockManager {
    * Maps a StorageID to the set of blocks that are "extra" for this
    * DataNode. We'll eventually remove these extras.
    */
-  public final Map<String, LightWeightLinkedSet<Block>> excessReplicateMap =
-    new TreeMap<String, LightWeightLinkedSet<Block>>();
+  public final Map<String, LightWeightLinkedSet<BlockInfo>> excessReplicateMap =
+    new TreeMap<>();
 
   /**
    * Store set of Blocks that need to be replicated 1 or more times.
@@ -502,8 +502,8 @@ public class BlockManager {
   /** Dump meta data to out. */
   public void metaSave(PrintWriter out) {
     assert namesystem.hasWriteLock();
-    final List<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
-    final List<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>();
+    final List<DatanodeDescriptor> live = new ArrayList<>();
+    final List<DatanodeDescriptor> dead = new ArrayList<>();
     datanodeManager.fetchDatanodes(live, dead, false);
     out.println("Live Datanodes: " + live.size());
     out.println("Dead Datanodes: " + dead.size());
@@ -542,8 +542,8 @@ public class BlockManager {
     List<DatanodeDescriptor> containingNodes =
                                       new ArrayList<DatanodeDescriptor>();
     List<DatanodeStorageInfo> containingLiveReplicasNodes =
-      new ArrayList<DatanodeStorageInfo>();
-    
+      new ArrayList<>();
+
     NumberReplicas numReplicas = new NumberReplicas();
     // source node returned is not used
     chooseSourceDatanode(block, containingNodes,
@@ -572,7 +572,7 @@ public class BlockManager {
     Collection<DatanodeDescriptor> corruptNodes = 
                                   corruptReplicas.getNodes(block);
     
-    for (DatanodeStorageInfo storage : blocksMap.getStorages(block)) {
+    for (DatanodeStorageInfo storage : getStorages(block)) {
       final DatanodeDescriptor node = storage.getDatanodeDescriptor();
       String state = "";
       if (corruptNodes != null && corruptNodes.contains(node)) {
@@ -595,11 +595,23 @@ public class BlockManager {
     return maxReplicationStreams;
   }
 
+  public int getDefaultStorageNum(BlockInfo block) {
+    return defaultReplication;
+  }
+
+  public short getMinStorageNum(BlockInfo block) {
+    return minReplication;
+  }
+
   /**
-   * @return true if the block has minimum replicas
+   * @return true if the block has minimum stored copies
    */
-  public boolean checkMinReplication(BlockInfo block) {
-    return (countNodes(block).liveReplicas() >= minReplication);
+  public boolean hasMinStorage(BlockInfo block) {
+    return hasMinStorage(block, countNodes(block).liveReplicas());
+  }
+
+  public boolean hasMinStorage(BlockInfo block, int liveNum) {
+    return liveNum >= getMinStorageNum(block);
   }
 
   /**
@@ -614,8 +626,9 @@ public class BlockManager {
   private static boolean commitBlock(
       final BlockInfoUnderConstruction block, final Block commitBlock)
       throws IOException {
-    if (block.getBlockUCState() == BlockUCState.COMMITTED)
+    if (block.getBlockUCState() == BlockUCState.COMMITTED) {
       return false;
+    }
     assert block.getNumBytes() <= commitBlock.getNumBytes() :
       "commitBlock length is less than the stored one "
       + commitBlock.getNumBytes() + " vs. " + block.getNumBytes();
@@ -635,18 +648,22 @@ public class BlockManager {
    */
   public boolean commitOrCompleteLastBlock(BlockCollection bc,
       Block commitBlock) throws IOException {
-    if(commitBlock == null)
+    if (commitBlock == null) {
       return false; // not committing, this is a block allocation retry
+    }
     BlockInfo lastBlock = bc.getLastBlock();
-    if(lastBlock == null)
+    if (lastBlock == null) {
       return false; // no blocks in file yet
-    if(lastBlock.isComplete())
+    }
+    if (lastBlock.isComplete()) {
       return false; // already completed (e.g. by syncBlock)
-    
+    }
+
     final boolean b = commitBlock(
         (BlockInfoUnderConstruction) lastBlock, commitBlock);
-    if(countNodes(lastBlock).liveReplicas() >= minReplication)
+    if(hasMinStorage(lastBlock)) {
       completeBlock(bc, bc.numBlocks()-1, false);
+    }
     return b;
   }
 
@@ -659,20 +676,24 @@ public class BlockManager {
    */
   private BlockInfo completeBlock(final BlockCollection bc,
       final int blkIndex, boolean force) throws IOException {
-    if(blkIndex < 0)
+    if (blkIndex < 0) {
       return null;
+    }
     BlockInfo curBlock = bc.getBlocks()[blkIndex];
-    if(curBlock.isComplete())
+    if(curBlock.isComplete()) {
       return curBlock;
+    }
     BlockInfoUnderConstruction ucBlock =
         (BlockInfoUnderConstruction) curBlock;
     int numNodes = ucBlock.numNodes();
-    if (!force && numNodes < minReplication)
+    if (!force && !hasMinStorage(curBlock, numNodes)) {
       throw new IOException("Cannot complete block: " +
           "block does not satisfy minimal replication requirement.");
-    if(!force && ucBlock.getBlockUCState() != BlockUCState.COMMITTED)
+    }
+    if(!force && ucBlock.getBlockUCState() != BlockUCState.COMMITTED) {
       throw new IOException(
           "Cannot complete block: block has not been COMMITTED by the client");
+    }
     BlockInfo completeBlock = ucBlock.convertToCompleteBlock();
     // replace penultimate block in file
     bc.setBlock(blkIndex, completeBlock);
@@ -757,7 +778,7 @@ public class BlockManager {
     // count in safe-mode.
     namesystem.adjustSafeModeBlockTotals(
         // decrement safe if we had enough
-        targets.length >= minReplication ? -1 : 0,
+        hasMinStorage(oldBlock, targets.length) ? -1 : 0,
         // always decrement total blocks
         -1);
 
@@ -771,8 +792,8 @@ public class BlockManager {
    */
   private List<DatanodeStorageInfo> getValidLocations(Block block) {
     final List<DatanodeStorageInfo> locations
-        = new ArrayList<DatanodeStorageInfo>(blocksMap.numNodes(block));
-    for(DatanodeStorageInfo storage : blocksMap.getStorages(block)) {
+        = new ArrayList<>(blocksMap.numNodes(block));
+    for(DatanodeStorageInfo storage : getStorages(block)) {
       // filter invalidate replicas
       if(!invalidateBlocks.contains(storage.getDatanodeDescriptor(), block)) {
         locations.add(storage);
@@ -785,7 +806,7 @@ public class BlockManager {
       final BlockInfo[] blocks,
       final long offset, final long length, final int nrBlocksToReturn,
       final AccessMode mode) throws IOException {
-    int curBlk = 0;
+    int curBlk;
     long curPos = 0, blkSize = 0;
     int nrBlocks = (blocks[0].getNumBytes() == 0) ? 0 : blocks.length;
     for (curBlk = 0; curBlk < nrBlocks; curBlk++) {
@@ -798,10 +819,10 @@ public class BlockManager {
     }
 
     if (nrBlocks > 0 && curBlk == nrBlocks)   // offset >= end of file
-      return Collections.<LocatedBlock>emptyList();
+      return Collections.emptyList();
 
     long endOff = offset + length;
-    List<LocatedBlock> results = new ArrayList<LocatedBlock>(blocks.length);
+    List<LocatedBlock> results = new ArrayList<>(blocks.length);
     do {
       results.add(createLocatedBlock(blocks[curBlk], curPos, mode));
       curPos += blocks[curBlk].getNumBytes();
@@ -814,7 +835,7 @@ public class BlockManager {
 
   private LocatedBlock createLocatedBlock(final BlockInfo[] blocks,
       final long endPos, final AccessMode mode) throws IOException {
-    int curBlk = 0;
+    int curBlk;
     long curPos = 0;
     int nrBlocks = (blocks[0].getNumBytes() == 0) ? 0 : blocks.length;
     for (curBlk = 0; curBlk < nrBlocks; curBlk++) {
@@ -838,8 +859,8 @@ public class BlockManager {
   }
 
   /** @return a LocatedBlock for the given block */
-  private LocatedBlock createLocatedBlock(final BlockInfo blk, final long pos
-      ) throws IOException {
+  private LocatedBlock createLocatedBlock(final BlockInfo blk, final long pos)
+      throws IOException {
     if (blk instanceof BlockInfoUnderConstruction) {
       if (blk.isComplete()) {
         throw new IOException(
@@ -849,7 +870,8 @@ public class BlockManager {
       final BlockInfoUnderConstruction uc =
           (BlockInfoUnderConstruction) blk;
       final DatanodeStorageInfo[] storages = uc.getExpectedStorageLocations();
-      final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(), blk);
+      final ExtendedBlock eb =
+          new ExtendedBlock(namesystem.getBlockPoolId(), blk);
       return newLocatedBlock(eb, storages, pos, false);
     }
 
@@ -869,11 +891,12 @@ public class BlockManager {
     final DatanodeStorageInfo[] machines = new DatanodeStorageInfo[numMachines];
     int j = 0;
     if (numMachines > 0) {
-      for(DatanodeStorageInfo storage : blocksMap.getStorages(blk)) {
+      for(DatanodeStorageInfo storage : getStorages(blk)) {
         final DatanodeDescriptor d = storage.getDatanodeDescriptor();
         final boolean replicaCorrupt = corruptReplicas.isReplicaCorrupt(blk, d);
-        if (isCorrupt || (!replicaCorrupt))
+        if (isCorrupt || (!replicaCorrupt)) {
           machines[j++] = storage;
+        }
       }
     }
     assert j == machines.length :
@@ -1047,7 +1070,7 @@ public class BlockManager {
     for(int i=0; i<startBlock; i++) {
       iter.next();
     }
-    List<BlockWithLocations> results = new ArrayList<BlockWithLocations>();
+    List<BlockWithLocations> results = new ArrayList<>();
     long totalSize = 0;
     BlockInfo curBlock;
     while(totalSize<size && iter.hasNext()) {
@@ -1071,7 +1094,7 @@ public class BlockManager {
    
   /** Remove the blocks associated to the given datanode. */
   void removeBlocksAssociatedTo(final DatanodeDescriptor node) {
-    final Iterator<? extends Block> it = node.getBlockIterator();
+    final Iterator<BlockInfo> it = node.getBlockIterator();
     while(it.hasNext()) {
       removeStoredBlock(it.next(), node);
     }
@@ -1085,10 +1108,10 @@ public class BlockManager {
   /** Remove the blocks associated to the given DatanodeStorageInfo. */
   void removeBlocksAssociatedTo(final DatanodeStorageInfo storageInfo) {
     assert namesystem.hasWriteLock();
-    final Iterator<? extends Block> it = storageInfo.getBlockIterator();
+    final Iterator<BlockInfo> it = storageInfo.getBlockIterator();
     DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
     while(it.hasNext()) {
-      Block block = it.next();
+      BlockInfo block = it.next();
       removeStoredBlock(block, node);
       invalidateBlocks.remove(node, block);
     }
@@ -1110,18 +1133,20 @@ public class BlockManager {
    * Adds block to list of blocks which will be invalidated on all its
    * datanodes.
    */
-  private void addToInvalidates(Block b) {
+  private void addToInvalidates(BlockInfo storedBlock) {
     if (!namesystem.isPopulatingReplQueues()) {
       return;
     }
     StringBuilder datanodes = new StringBuilder();
-    for(DatanodeStorageInfo storage : blocksMap.getStorages(b, State.NORMAL)) {
+    for(DatanodeStorageInfo storage : blocksMap.getStorages(storedBlock,
+        State.NORMAL)) {
       final DatanodeDescriptor node = storage.getDatanodeDescriptor();
-      invalidateBlocks.add(b, node, false);
+      invalidateBlocks.add(storedBlock, node, false);
       datanodes.append(node).append(" ");
     }
     if (datanodes.length() != 0) {
-      blockLog.info("BLOCK* addToInvalidates: {} {}", b, datanodes.toString());
+      blockLog.info("BLOCK* addToInvalidates: {} {}", storedBlock,
+          datanodes.toString());
     }
   }
 
@@ -1148,7 +1173,8 @@ public class BlockManager {
   public void findAndMarkBlockAsCorrupt(final ExtendedBlock blk,
       final DatanodeInfo dn, String storageID, String reason) throws IOException {
     assert namesystem.hasWriteLock();
-    final BlockInfo storedBlock = getStoredBlock(blk.getLocalBlock());
+    final Block reportedBlock = blk.getLocalBlock();
+    final BlockInfo storedBlock = getStoredBlock(reportedBlock);
     if (storedBlock == null) {
       // Check if the replica is in the blockMap, if not
       // ignore the request for now. This could happen when BlockScanner
@@ -1164,8 +1190,8 @@ public class BlockManager {
           + " as corrupt because datanode " + dn + " (" + dn.getDatanodeUuid()
           + ") does not exist");
     }
-    
-    markBlockAsCorrupt(new BlockToMarkCorrupt(storedBlock,
+
+    markBlockAsCorrupt(new BlockToMarkCorrupt(reportedBlock, storedBlock,
             blk.getGenerationStamp(), reason, Reason.CORRUPTION_REPORTED),
         storageID == null ? null : node.getStorageInfo(storageID),
         node);
@@ -1181,18 +1207,18 @@ public class BlockManager {
       DatanodeStorageInfo storageInfo,
       DatanodeDescriptor node) throws IOException {
 
-    if (b.corrupted.isDeleted()) {
+    if (b.stored.isDeleted()) {
       blockLog.info("BLOCK markBlockAsCorrupt: {} cannot be marked as" +
           " corrupt as it does not belong to any file", b);
       addToInvalidates(b.corrupted, node);
       return;
     } 
     short expectedReplicas =
-        b.corrupted.getBlockCollection().getPreferredBlockReplication();
+        getExpectedReplicaNum(b.stored.getBlockCollection(), b.stored);
 
     // Add replica to the data-node if it is not already there
     if (storageInfo != null) {
-      storageInfo.addBlock(b.stored);
+      storageInfo.addBlock(b.stored, b.corrupted);
     }
 
     // Add this replica to corruptReplicas Map
@@ -1202,8 +1228,8 @@ public class BlockManager {
     NumberReplicas numberOfReplicas = countNodes(b.stored);
     boolean hasEnoughLiveReplicas = numberOfReplicas.liveReplicas() >=
         expectedReplicas;
-    boolean minReplicationSatisfied =
-        numberOfReplicas.liveReplicas() >= minReplication;
+    boolean minReplicationSatisfied = hasMinStorage(b.stored,
+        numberOfReplicas.liveReplicas());
     boolean hasMoreCorruptReplicas = minReplicationSatisfied &&
         (numberOfReplicas.liveReplicas() + numberOfReplicas.corruptReplicas()) >
         expectedReplicas;
@@ -1346,7 +1372,7 @@ public class BlockManager {
     int additionalReplRequired;
 
     int scheduledWork = 0;
-    List<ReplicationWork> work = new LinkedList<ReplicationWork>();
+    List<ReplicationWork> work = new LinkedList<>();
 
     namesystem.writeLock();
     try {
@@ -1363,11 +1389,11 @@ public class BlockManager {
               continue;
             }
 
-            requiredReplication = bc.getPreferredBlockReplication();
+            requiredReplication = getExpectedReplicaNum(bc, block);
 
             // get a source data-node
-            containingNodes = new ArrayList<DatanodeDescriptor>();
-            List<DatanodeStorageInfo> liveReplicaNodes = new ArrayList<DatanodeStorageInfo>();
+            containingNodes = new ArrayList<>();
+            List<DatanodeStorageInfo> liveReplicaNodes = new ArrayList<>();
             NumberReplicas numReplicas = new NumberReplicas();
             srcNode = chooseSourceDatanode(
                 block, containingNodes, liveReplicaNodes, numReplicas,
@@ -1387,7 +1413,7 @@ public class BlockManager {
       
             if (numEffectiveReplicas >= requiredReplication) {
               if ( (pendingReplications.getNumReplicas(block) > 0) ||
-                   (blockHasEnoughRacks(block)) ) {
+                   (blockHasEnoughRacks(block, requiredReplication)) ) {
                 neededReplications.remove(block, priority); // remove from neededReplications
                 blockLog.info("BLOCK* Removing {} from neededReplications as" +
                         " it has enough replicas", block);
@@ -1411,7 +1437,7 @@ public class BlockManager {
       namesystem.writeUnlock();
     }
 
-    final Set<Node> excludedNodes = new HashSet<Node>();
+    final Set<Node> excludedNodes = new HashSet<>();
     for(ReplicationWork rw : work){
       // Exclude all of the containing nodes from being targets.
       // This list includes decommissioning or corrupt nodes.
@@ -1447,7 +1473,7 @@ public class BlockManager {
             rw.targets = null;
             continue;
           }
-          requiredReplication = bc.getPreferredBlockReplication();
+          requiredReplication = getExpectedReplicaNum(bc, block);
 
           // do not schedule more if enough replicas is already pending
           NumberReplicas numReplicas = countNodes(block);
@@ -1456,7 +1482,7 @@ public class BlockManager {
 
           if (numEffectiveReplicas >= requiredReplication) {
             if ( (pendingReplications.getNumReplicas(block) > 0) ||
-                 (blockHasEnoughRacks(block)) ) {
+                 (blockHasEnoughRacks(block, requiredReplication)) ) {
               neededReplications.remove(block, priority); // remove from neededReplications
               rw.targets = null;
               blockLog.info("BLOCK* Removing {} from neededReplications as" +
@@ -1466,7 +1492,7 @@ public class BlockManager {
           }
 
           if ( (numReplicas.liveReplicas() >= requiredReplication) &&
-               (!blockHasEnoughRacks(block)) ) {
+               (!blockHasEnoughRacks(block, requiredReplication)) ) {
             if (rw.srcNode.getNetworkLocation().equals(
                 targets[0].getDatanodeDescriptor().getNetworkLocation())) {
               //No use continuing, unless a new rack in this case
@@ -1581,7 +1607,7 @@ public class BlockManager {
   List<DatanodeDescriptor> getDatanodeDescriptors(List<String> nodes) {
     List<DatanodeDescriptor> datanodeDescriptors = null;
     if (nodes != null) {
-      datanodeDescriptors = new ArrayList<DatanodeDescriptor>(nodes.size());
+      datanodeDescriptors = new ArrayList<>(nodes.size());
       for (int i = 0; i < nodes.size(); i++) {
         DatanodeDescriptor node = datanodeManager.getDatanodeDescriptor(nodes.get(i));
         if (node != null) {
@@ -1637,9 +1663,9 @@ public class BlockManager {
     int excess = 0;
     
     Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(block);
-    for(DatanodeStorageInfo storage : blocksMap.getStorages(block)) {
+    for(DatanodeStorageInfo storage : getStorages(block)) {
       final DatanodeDescriptor node = storage.getDatanodeDescriptor();
-      LightWeightLinkedSet<Block> excessBlocks =
+      LightWeightLinkedSet<BlockInfo> excessBlocks =
         excessReplicateMap.get(node.getDatanodeUuid());
       int countableReplica = storage.getState() == State.NORMAL ? 1 : 0; 
       if ((nodesCorrupt != null) && (nodesCorrupt.contains(node)))
@@ -1707,7 +1733,7 @@ public class BlockManager {
            * Use the blockinfo from the blocksmap to be certain we're working
            * with the most up-to-date block information (e.g. genstamp).
            */
-          BlockInfo bi = blocksMap.getStoredBlock(timedOutItems[i]);
+          BlockInfo bi = getStoredBlock(timedOutItems[i]);
           if (bi == null) {
             continue;
           }
@@ -1757,7 +1783,7 @@ public class BlockManager {
     final BlockInfoUnderConstruction storedBlock;
     final Block reportedBlock;
     final ReplicaState reportedState;
-    
+
     StatefulBlockInfo(BlockInfoUnderConstruction storedBlock,
         Block reportedBlock, ReplicaState reportedState) {
       this.storedBlock = storedBlock;
@@ -1765,14 +1791,34 @@ public class BlockManager {
       this.reportedState = reportedState;
     }
   }
-  
+
+  private static class BlockInfoToAdd {
+    private final BlockInfo stored;
+    private final Block reported;
+
+    BlockInfoToAdd(BlockInfo stored, Block reported) {
+      this.stored = stored;
+      this.reported = reported;
+    }
+
+    public BlockInfo getStored() {
+      return stored;
+    }
+
+    public Block getReported() {
+      return reported;
+    }
+  }
+
   /**
    * BlockToMarkCorrupt is used to build the "toCorrupt" list, which is a
    * list of blocks that should be considered corrupt due to a block report.
    */
   private static class BlockToMarkCorrupt {
-    /** The corrupted block in a datanode. */
-    final BlockInfo corrupted;
+    /** The corrupted block in a datanode. This is the one reported by the
+     * datanode.
+     */
+    final Block corrupted;
     /** The corresponding block stored in the BlockManager. */
     final BlockInfo stored;
     /** The reason to mark corrupt. */
@@ -1780,7 +1826,7 @@ public class BlockManager {
     /** The reason code to be stored */
     final Reason reasonCode;
 
-    BlockToMarkCorrupt(BlockInfo corrupted,
+    BlockToMarkCorrupt(Block corrupted,
         BlockInfo stored, String reason,
         Reason reasonCode) {
       Preconditions.checkNotNull(corrupted, "corrupted is null");
@@ -1792,15 +1838,9 @@ public class BlockManager {
       this.reasonCode = reasonCode;
     }
 
-    BlockToMarkCorrupt(BlockInfo stored, String reason,
-        Reason reasonCode) {
-      this(stored, stored, reason, reasonCode);
-    }
-
-    BlockToMarkCorrupt(BlockInfo stored, long gs, String reason,
-        Reason reasonCode) {
-      this(new BlockInfoContiguous(stored), stored,
-          reason, reasonCode);
+    BlockToMarkCorrupt(Block corrupted, BlockInfo stored, long gs,
+        String reason, Reason reasonCode) {
+      this(corrupted, stored, reason, reasonCode);
       //the corrupted block in datanode has a different generation stamp
       corrupted.setGenerationStamp(gs);
     }
@@ -1987,7 +2027,7 @@ public class BlockManager {
           break;
         }
 
-        BlockInfo bi = blocksMap.getStoredBlock(b);
+        BlockInfo bi = getStoredBlock(b);
         if (bi == null) {
           if (LOG.isDebugEnabled()) {
             LOG.debug("BLOCK* rescanPostponedMisreplicatedBlocks: " +
@@ -2019,7 +2059,7 @@ public class BlockManager {
           endPostponedMisReplicatedBlocksCount) + " blocks are removed.");
     }
   }
-  
+
   private Collection<Block> processReport(
       final DatanodeStorageInfo storageInfo,
       final BlockListAsLongs report) throws IOException {
@@ -2027,25 +2067,26 @@ public class BlockManager {
     // Modify the (block-->datanode) map, according to the difference
     // between the old and new block report.
     //
-    Collection<BlockInfo> toAdd = new LinkedList<BlockInfo>();
-    Collection<Block> toRemove = new TreeSet<Block>();
-    Collection<Block> toInvalidate = new LinkedList<Block>();
-    Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<BlockToMarkCorrupt>();
-    Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>();
+    Collection<BlockInfoToAdd> toAdd = new LinkedList<>();
+    Collection<BlockInfo> toRemove = new TreeSet<>();
+    Collection<Block> toInvalidate = new LinkedList<>();
+    Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<>();
+    Collection<StatefulBlockInfo> toUC = new LinkedList<>();
     reportDiff(storageInfo, report,
         toAdd, toRemove, toInvalidate, toCorrupt, toUC);
-   
+
     DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
     // Process the blocks on each queue
-    for (StatefulBlockInfo b : toUC) { 
+    for (StatefulBlockInfo b : toUC) {
       addStoredBlockUnderConstruction(b, storageInfo);
     }
-    for (Block b : toRemove) {
+    for (BlockInfo b : toRemove) {
       removeStoredBlock(b, node);
     }
     int numBlocksLogged = 0;
-    for (BlockInfo b : toAdd) {
-      addStoredBlock(b, storageInfo, null, numBlocksLogged < maxNumBlocksToLog);
+    for (BlockInfoToAdd b : toAdd) {
+      addStoredBlock(b.getStored(), b.getReported(), storageInfo, null,
+          numBlocksLogged < maxNumBlocksToLog);
       numBlocksLogged++;
     }
     if (numBlocksLogged > maxNumBlocksToLog) {
@@ -2066,17 +2107,17 @@ public class BlockManager {
    * Mark block replicas as corrupt except those on the storages in 
    * newStorages list.
    */
-  public void markBlockReplicasAsCorrupt(BlockInfo block,
-      long oldGenerationStamp, long oldNumBytes, 
+  public void markBlockReplicasAsCorrupt(Block oldBlock, BlockInfo block,
+      long oldGenerationStamp, long oldNumBytes,
       DatanodeStorageInfo[] newStorages) throws IOException {
     assert namesystem.hasWriteLock();
     BlockToMarkCorrupt b = null;
     if (block.getGenerationStamp() != oldGenerationStamp) {
-      b = new BlockToMarkCorrupt(block, oldGenerationStamp,
+      b = new BlockToMarkCorrupt(oldBlock, block, oldGenerationStamp,
           "genstamp does not match " + oldGenerationStamp
           + " : " + block.getGenerationStamp(), Reason.GENSTAMP_MISMATCH);
     } else if (block.getNumBytes() != oldNumBytes) {
-      b = new BlockToMarkCorrupt(block,
+      b = new BlockToMarkCorrupt(oldBlock, block,
           "length does not match " + oldNumBytes
           + " : " + block.getNumBytes(), Reason.SIZE_MISMATCH);
     } else {
@@ -2135,7 +2176,7 @@ public class BlockManager {
         continue;
       }
       
-      BlockInfo storedBlock = blocksMap.getStoredBlock(iblk);
+      BlockInfo storedBlock = getStoredBlock(iblk);
       // If block does not belong to any file, we are done.
       if (storedBlock == null) continue;
       
@@ -2173,24 +2214,26 @@ public class BlockManager {
       }      
       //add replica if appropriate
       if (reportedState == ReplicaState.FINALIZED) {
-        addStoredBlockImmediate(storedBlock, storageInfo);
+        addStoredBlockImmediate(storedBlock, iblk, storageInfo);
       }
     }
   }
 
-  private void reportDiff(DatanodeStorageInfo storageInfo, 
-      BlockListAsLongs newReport, 
-      Collection<BlockInfo> toAdd,              // add to DatanodeDescriptor
-      Collection<Block> toRemove,           // remove from DatanodeDescriptor
+  private void reportDiff(DatanodeStorageInfo storageInfo,
+      BlockListAsLongs newReport,
+      Collection<BlockInfoToAdd> toAdd,     // add to DatanodeDescriptor
+      Collection<BlockInfo> toRemove,       // remove from DatanodeDescriptor
       Collection<Block> toInvalidate,       // should be removed from DN
       Collection<BlockToMarkCorrupt> toCorrupt, // add to corrupt replicas list
       Collection<StatefulBlockInfo> toUC) { // add to under-construction list
 
-    // place a delimiter in the list which separates blocks 
+    // place a delimiter in the list which separates blocks
     // that have been reported from those that have not
-    BlockInfo delimiter = new BlockInfoContiguous(new Block(), (short) 1);
-    AddBlockResult result = storageInfo.addBlock(delimiter);
-    assert result == AddBlockResult.ADDED 
+    Block delimiterBlock = new Block();
+    BlockInfo delimiter = new BlockInfoContiguous(delimiterBlock,
+        (short) 1);
+    AddBlockResult result = storageInfo.addBlock(delimiter, delimiterBlock);
+    assert result == AddBlockResult.ADDED
         : "Delimiting block cannot be present in the node";
     int headIndex = 0; //currently the delimiter is in the head of the list
     int curIndex;
@@ -2207,7 +2250,8 @@ public class BlockManager {
       // move block to the head of the list
       if (storedBlock != null &&
           (curIndex = storedBlock.findStorageInfo(storageInfo)) >= 0) {
-        headIndex = storageInfo.moveBlockToHead(storedBlock, curIndex, headIndex);
+        headIndex =
+            storageInfo.moveBlockToHead(storedBlock, curIndex, headIndex);
       }
     }
 
@@ -2215,8 +2259,9 @@ public class BlockManager {
     // all of them are next to the delimiter
     Iterator<BlockInfo> it =
         storageInfo.new BlockIterator(delimiter.getNext(0));
-    while(it.hasNext())
+    while (it.hasNext()) {
       toRemove.add(it.next());
+    }
     storageInfo.removeBlock(delimiter);
   }
 
@@ -2253,12 +2298,12 @@ public class BlockManager {
    */
   private BlockInfo processReportedBlock(
       final DatanodeStorageInfo storageInfo,
-      final Block block, final ReplicaState reportedState, 
-      final Collection<BlockInfo> toAdd,
-      final Collection<Block> toInvalidate, 
+      final Block block, final ReplicaState reportedState,
+      final Collection<BlockInfoToAdd> toAdd,
+      final Collection<Block> toInvalidate,
       final Collection<BlockToMarkCorrupt> toCorrupt,
       final Collection<StatefulBlockInfo> toUC) {
-    
+
     DatanodeDescriptor dn = storageInfo.getDatanodeDescriptor();
 
     if(LOG.isDebugEnabled()) {
@@ -2266,16 +2311,16 @@ public class BlockManager {
           + " on " + dn + " size " + block.getNumBytes()
           + " replicaState = " + reportedState);
     }
-  
+
     if (shouldPostponeBlocksFromFuture &&
         namesystem.isGenStampInFuture(block)) {
       queueReportedBlock(storageInfo, block, reportedState,
           QUEUE_REASON_FUTURE_GENSTAMP);
       return null;
     }
-    
+
     // find block by blockId
-    BlockInfo storedBlock = blocksMap.getStoredBlock(block);
+    BlockInfo storedBlock = getStoredBlock(block);
     if(storedBlock == null) {
       // If blocksMap does not contain reported block id,
       // the replica should be removed from the data-node.
@@ -2283,7 +2328,7 @@ public class BlockManager {
       return null;
     }
     BlockUCState ucState = storedBlock.getBlockUCState();
-    
+
     // Block is on the NN
     if(LOG.isDebugEnabled()) {
       LOG.debug("In memory blockUCState = " + ucState);
@@ -2328,8 +2373,8 @@ public class BlockManager {
     // but now okay, it might need to be updated.
     if (reportedState == ReplicaState.FINALIZED
         && (storedBlock.findStorageInfo(storageInfo) == -1 ||
-            corruptReplicas.isReplicaCorrupt(storedBlock, dn))) {
-      toAdd.add(storedBlock);
+        corruptReplicas.isReplicaCorrupt(storedBlock, dn))) {
+      toAdd.add(new BlockInfoToAdd(storedBlock, block));
     }
     return storedBlock;
   }
@@ -2375,7 +2420,7 @@ public class BlockManager {
       if (rbi.getReportedState() == null) {
         // This is a DELETE_BLOCK request
         DatanodeStorageInfo storageInfo = rbi.getStorageInfo();
-        removeStoredBlock(rbi.getBlock(),
+        removeStoredBlock(getStoredBlock(rbi.getBlock()),
             storageInfo.getDatanodeDescriptor());
       } else {
         processAndHandleReportedBlock(rbi.getStorageInfo(),
@@ -2423,15 +2468,15 @@ public class BlockManager {
       case COMMITTED:
         if (storedBlock.getGenerationStamp() != reported.getGenerationStamp()) {
           final long reportedGS = reported.getGenerationStamp();
-          return new BlockToMarkCorrupt(storedBlock, reportedGS,
+          return new BlockToMarkCorrupt(new Block(reported), storedBlock, reportedGS,
               "block is " + ucState + " and reported genstamp " + reportedGS
-              + " does not match genstamp in block map "
-              + storedBlock.getGenerationStamp(), Reason.GENSTAMP_MISMATCH);
+                  + " does not match genstamp in block map "
+                  + storedBlock.getGenerationStamp(), Reason.GENSTAMP_MISMATCH);
         } else if (storedBlock.getNumBytes() != reported.getNumBytes()) {
-          return new BlockToMarkCorrupt(storedBlock,
+          return new BlockToMarkCorrupt(new Block(reported), storedBlock,
               "block is " + ucState + " and reported length " +
-              reported.getNumBytes() + " does not match " +
-              "length in block map " + storedBlock.getNumBytes(),
+                  reported.getNumBytes() + " does not match " +
+                  "length in block map " + storedBlock.getNumBytes(),
               Reason.SIZE_MISMATCH);
         } else {
           return null; // not corrupt
@@ -2439,11 +2484,12 @@ public class BlockManager {
       case UNDER_CONSTRUCTION:
         if (storedBlock.getGenerationStamp() > reported.getGenerationStamp()) {
           final long reportedGS = reported.getGenerationStamp();
-          return new BlockToMarkCorrupt(storedBlock, reportedGS, "block is "
-              + ucState + " and reported state " + reportedState
-              + ", But reported genstamp " + reportedGS
+          return new BlockToMarkCorrupt(new Block(reported), storedBlock,
+              reportedGS, "block is " + ucState + " and reported state "
+              + reportedState + ", But reported genstamp " + reportedGS
               + " does not match genstamp in block map "
-              + storedBlock.getGenerationStamp(), Reason.GENSTAMP_MISMATCH);
+              + storedBlock.getGenerationStamp(),
+              Reason.GENSTAMP_MISMATCH);
         }
         return null;
       default:
@@ -2453,12 +2499,15 @@ public class BlockManager {
     case RWR:
       if (!storedBlock.isComplete()) {
         return null; // not corrupt
-      } else if (storedBlock.getGenerationStamp() != reported.getGenerationStamp()) {
+      } else if (storedBlock.getGenerationStamp() !=
+          reported.getGenerationStamp()) {
         final long reportedGS = reported.getGenerationStamp();
-        return new BlockToMarkCorrupt(storedBlock, reportedGS,
-            "reported " + reportedState + " replica with genstamp " + reportedGS
-            + " does not match COMPLETE block's genstamp in block map "
-            + storedBlock.getGenerationStamp(), Reason.GENSTAMP_MISMATCH);
+        return new BlockToMarkCorrupt(
+            new Block(reported), storedBlock, reportedGS,
+            "reported " + reportedState +
+                " replica with genstamp " + reportedGS +
+                " does not match COMPLETE block's genstamp in block map " +
+                storedBlock.getGenerationStamp(), Reason.GENSTAMP_MISMATCH);
       } else { // COMPLETE block, same genstamp
         if (reportedState == ReplicaState.RBW) {
           // If it's a RBW report for a COMPLETE block, it may just be that
@@ -2470,7 +2519,7 @@ public class BlockManager {
               "complete with the same genstamp");
           return null;
         } else {
-          return new BlockToMarkCorrupt(storedBlock,
+          return new BlockToMarkCorrupt(new Block(reported), storedBlock,
               "reported replica has invalid state " + reportedState,
               Reason.INVALID_STATE);
         }
@@ -2483,7 +2532,8 @@ public class BlockManager {
       " on " + dn + " size " + storedBlock.getNumBytes();
       // log here at WARN level since this is really a broken HDFS invariant
       LOG.warn(msg);
-      return new BlockToMarkCorrupt(storedBlock, msg, Reason.INVALID_STATE);
+      return new BlockToMarkCorrupt(new Block(reported), storedBlock, msg,
+          Reason.INVALID_STATE);
     }
   }
 
@@ -2516,7 +2566,7 @@ public class BlockManager {
 
     if (ucBlock.reportedState == ReplicaState.FINALIZED &&
         (block.findStorageInfo(storageInfo) < 0)) {
-      addStoredBlock(block, storageInfo, null, true);
+      addStoredBlock(block, ucBlock.reportedBlock, storageInfo, null, true);
     }
   } 
 
@@ -2531,23 +2581,23 @@ public class BlockManager {
    * 
    * @throws IOException
    */
-  private void addStoredBlockImmediate(BlockInfo storedBlock,
+  private void addStoredBlockImmediate(BlockInfo storedBlock, Block reported,
       DatanodeStorageInfo storageInfo)
-  throws IOException {
+      throws IOException {
     assert (storedBlock != null && namesystem.hasWriteLock());
-    if (!namesystem.isInStartupSafeMode() 
+    if (!namesystem.isInStartupSafeMode()
         || namesystem.isPopulatingReplQueues()) {
-      addStoredBlock(storedBlock, storageInfo, null, false);
+      addStoredBlock(storedBlock, reported, storageInfo, null, false);
       return;
     }
 
     // just add it
-    AddBlockResult result = storageInfo.addBlock(storedBlock);
+    AddBlockResult result = storageInfo.addBlock(storedBlock, reported);
 
     // Now check for completion of blocks and safe block count
     int numCurrentReplica = countLiveNodes(storedBlock);
     if (storedBlock.getBlockUCState() == BlockUCState.COMMITTED
-        && numCurrentReplica >= minReplication) {
+        && hasMinStorage(storedBlock, numCurrentReplica)) {
       completeBlock(storedBlock.getBlockCollection(), storedBlock, false);
     } else if (storedBlock.isComplete() && result == AddBlockResult.ADDED) {
       // check whether safe replication is reached for the block
@@ -2561,19 +2611,20 @@ public class BlockManager {
   /**
    * Modify (block-->datanode) map. Remove block from set of
    * needed replications if this takes care of the problem.
-   * @return the block that is stored in blockMap.
+   * @return the block that is stored in blocksMap.
    */
   private Block addStoredBlock(final BlockInfo block,
-                               DatanodeStorageInfo storageInfo,
-                               DatanodeDescriptor delNodeHint,
-                               boolean logEveryBlock)
-  throws IOException {
+      final Block reportedBlock,
+      DatanodeStorageInfo storageInfo,
+      DatanodeDescriptor delNodeHint,
+      boolean logEveryBlock)
+      throws IOException {
     assert block != null && namesystem.hasWriteLock();
     BlockInfo storedBlock;
     DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
     if (block instanceof BlockInfoUnderConstruction) {
       //refresh our copy in case the block got completed in another thread
-      storedBlock = blocksMap.getStoredBlock(block);
+      storedBlock = getStoredBlock(block);
     } else {
       storedBlock = block;
     }
@@ -2587,10 +2638,9 @@ public class BlockManager {
       return block;
     }
     BlockCollection bc = storedBlock.getBlockCollection();
-    assert bc != null : "Block must belong to a file";
 
     // add block to the datanode
-    AddBlockResult result = storageInfo.addBlock(storedBlock);
+    AddBlockResult result = storageInfo.addBlock(storedBlock, reportedBlock);
 
     int curReplicaDelta;
     if (result == AddBlockResult.ADDED) {
@@ -2618,10 +2668,10 @@ public class BlockManager {
     NumberReplicas num = countNodes(storedBlock);
     int numLiveReplicas = num.liveReplicas();
     int numCurrentReplica = numLiveReplicas
-      + pendingReplications.getNumReplicas(storedBlock);
+        + pendingReplications.getNumReplicas(storedBlock);
 
     if(storedBlock.getBlockUCState() == BlockUCState.COMMITTED &&
-        numLiveReplicas >= minReplication) {
+        hasMinStorage(storedBlock, numLiveReplicas)) {
       storedBlock = completeBlock(bc, storedBlock, false);
     } else if (storedBlock.isComplete() && result == AddBlockResult.ADDED) {
       // check whether safe replication is reached for the block
@@ -2631,7 +2681,7 @@ public class BlockManager {
       // handles the safe block count maintenance.
       namesystem.incrementSafeBlockCount(numCurrentReplica);
     }
-    
+
     // if file is under construction, then done for now
     if (bc.isUnderConstruction()) {
       return storedBlock;
@@ -2643,7 +2693,7 @@ public class BlockManager {
     }
 
     // handle underReplication/overReplication
-    short fileReplication = bc.getPreferredBlockReplication();
+    short fileReplication = getExpectedReplicaNum(bc, storedBlock);
     if (!isNeededReplication(storedBlock, fileReplication, numCurrentReplica)) {
       neededReplications.remove(storedBlock, numCurrentReplica,
           num.decommissionedAndDecommissioning(), fileReplication);
@@ -2659,11 +2709,12 @@ public class BlockManager {
     int numCorruptNodes = num.corruptReplicas();
     if (numCorruptNodes != corruptReplicasCount) {
       LOG.warn("Inconsistent number of corrupt replicas for " +
-          storedBlock + "blockMap has " + numCorruptNodes + 
+          storedBlock + ". blockMap has " + numCorruptNodes +
           " but corrupt replicas map has " + corruptReplicasCount);
     }
-    if ((corruptReplicasCount > 0) && (numLiveReplicas >= fileReplication))
-      invalidateCorruptReplicas(storedBlock);
+    if ((corruptReplicasCount > 0) && (numLiveReplicas >= fileReplication)) {
+      invalidateCorruptReplicas(storedBlock, reportedBlock);
+    }
     return storedBlock;
   }
 
@@ -2695,7 +2746,7 @@ public class BlockManager {
    *
    * @param blk Block whose corrupt replicas need to be invalidated
    */
-  private void invalidateCorruptReplicas(BlockInfo blk) {
+  private void invalidateCorruptReplicas(BlockInfo blk, Block reported) {
     Collection<DatanodeDescriptor> nodes = corruptReplicas.getNodes(blk);
     boolean removedFromBlocksMap = true;
     if (nodes == null)
@@ -2705,8 +2756,8 @@ public class BlockManager {
     DatanodeDescriptor[] nodesCopy = nodes.toArray(new DatanodeDescriptor[0]);
     for (DatanodeDescriptor node : nodesCopy) {
       try {
-        if (!invalidateBlock(new BlockToMarkCorrupt(blk, null,
-              Reason.ANY), node)) {
+        if (!invalidateBlock(new BlockToMarkCorrupt(reported, blk, null,
+            Reason.ANY), node)) {
           removedFromBlocksMap = false;
         }
       } catch (IOException e) {
@@ -2874,7 +2925,7 @@ public class BlockManager {
     }
     // calculate current replication
     short expectedReplication =
-        block.getBlockCollection().getPreferredBlockReplication();
+        getExpectedReplicaNum(block.getBlockCollection(), block);
     NumberReplicas num = countNodes(block);
     int numCurrentReplica = num.liveReplicas();
     // add to under-replicated queue if need to be
@@ -2933,14 +2984,14 @@ public class BlockManager {
    * If there are any extras, call chooseExcessReplicates() to
    * mark them in the excessReplicateMap.
    */
-  private void processOverReplicatedBlock(final Block block,
+  private void processOverReplicatedBlock(final BlockInfo block,
       final short replication, final DatanodeDescriptor addedNode,
       DatanodeDescriptor delNodeHint) {
     assert namesystem.hasWriteLock();
     if (addedNode == delNodeHint) {
       delNodeHint = null;
     }
-    Collection<DatanodeStorageInfo> nonExcess = new ArrayList<DatanodeStorageInfo>();
+    Collection<DatanodeStorageInfo> nonExcess = new ArrayList<>();
     Collection<DatanodeDescriptor> corruptNodes = corruptReplicas
         .getNodes(block);
     for(DatanodeStorageInfo storage : blocksMap.getStorages(block, State.NORMAL)) {
@@ -2954,8 +3005,8 @@ public class BlockManager {
         postponeBlock(block);
         return;
       }
-      LightWeightLinkedSet<Block> excessBlocks = excessReplicateMap.get(cur
-          .getDatanodeUuid());
+      LightWeightLinkedSet<BlockInfo> excessBlocks = excessReplicateMap.get(
+          cur.getDatanodeUuid());
       if (excessBlocks == null || !excessBlocks.contains(block)) {
         if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) {
           // exclude corrupt replicas
@@ -2965,7 +3016,7 @@ public class BlockManager {
         }
       }
     }
-    chooseExcessReplicates(nonExcess, block, replication, 
+    chooseExcessReplicates(nonExcess, block, replication,
         addedNode, delNodeHint, blockplacement);
   }
 
@@ -2984,29 +3035,29 @@ public class BlockManager {
    * If no such a node is available,
    * then pick a node with least free space
    */
-  private void chooseExcessReplicates(final Collection<DatanodeStorageInfo> nonExcess, 
-                              Block b, short replication,
-                              DatanodeDescriptor addedNode,
-                              DatanodeDescriptor delNodeHint,
-                              BlockPlacementPolicy replicator) {
+  private void chooseExcessReplicates(
+      final Collection<DatanodeStorageInfo> nonExcess,
+      BlockInfo storedBlock, short replication,
+      DatanodeDescriptor addedNode,
+      DatanodeDescriptor delNodeHint,
+      BlockPlacementPolicy replicator) {
     assert namesystem.hasWriteLock();
     // first form a rack to datanodes map and
-    BlockCollection bc = getBlockCollection(b);
-    final BlockStoragePolicy storagePolicy = storagePolicySuite.getPolicy(bc.getStoragePolicyID());
+    BlockCollection bc = getBlockCollection(storedBlock);
+    final BlockStoragePolicy storagePolicy = storagePolicySuite.getPolicy(
+        bc.getStoragePolicyID());
     final List<StorageType> excessTypes = storagePolicy.chooseExcess(
         replication, DatanodeStorageInfo.toStorageTypes(nonExcess));
 
+    final Map<String, List<DatanodeStorageInfo>> rackMap = new HashMap<>();
+    final List<DatanodeStorageInfo> moreThanOne = new ArrayList<>();
+    final List<DatanodeStorageInfo> exactlyOne = new ArrayList<>();
 
-    final Map<String, List<DatanodeStorageInfo>> rackMap
-        = new HashMap<String, List<DatanodeStorageInfo>>();
-    final List<DatanodeStorageInfo> moreThanOne = new ArrayList<DatanodeStorageInfo>();
-    final List<DatanodeStorageInfo> exactlyOne = new ArrayList<DatanodeStorageInfo>();
-    
     // split nodes into two sets
     // moreThanOne contains nodes on rack with more than one replica
     // exactlyOne contains the remaining nodes
     replicator.splitNodesWithRack(nonExcess, rackMap, moreThanOne, exactlyOne);
-    
+
     // pick one node to delete that favors the delete hint
     // otherwise pick one with least space from priSet if it is not empty
     // otherwise one node with least space from remains
@@ -3021,7 +3072,7 @@ public class BlockManager {
           moreThanOne, excessTypes)) {
         cur = delNodeHintStorage;
       } else { // regular excessive replica removal
-        cur = replicator.chooseReplicaToDelete(bc, b, replication,
+        cur = replicator.chooseReplicaToDelete(bc, storedBlock, replication,
             moreThanOne, exactlyOne, excessTypes);
       }
       firstOne = false;
@@ -3030,24 +3081,29 @@ public class BlockManager {
       replicator.adjustSetsWithChosenReplica(rackMap, moreThanOne,
           exactlyOne, cur);
 
-      nonExcess.remove(cur);
-      addToExcessReplicate(cur.getDatanodeDescriptor(), b);
-
-      //
-      // The 'excessblocks' tracks blocks until we get confirmation
-      // that the datanode has deleted them; the only way we remove them
-      // is when we get a "removeBlock" message.  
-      //
-      // The 'invalidate' list is used to inform the datanode the block 
-      // should be deleted.  Items are removed from the invalidate list
-      // upon giving instructions to the namenode.
-      //
-      addToInvalidates(b, cur.getDatanodeDescriptor());
-      blockLog.info("BLOCK* chooseExcessReplicates: "
-                +"({}, {}) is added to invalidated blocks set", cur, b);
+      processChosenExcessReplica(nonExcess, cur, storedBlock);
     }
   }
 
+  private void processChosenExcessReplica(
+      final Collection<DatanodeStorageInfo> nonExcess,
+      final DatanodeStorageInfo chosen, BlockInfo storedBlock) {
+    nonExcess.remove(chosen);
+    addToExcessReplicate(chosen.getDatanodeDescriptor(), storedBlock);
+    //
+    // The 'excessblocks' tracks blocks until we get confirmation
+    // that the datanode has deleted them; the only way we remove them
+    // is when we get a "removeBlock" message.
+    //
+    // The 'invalidate' list is used to inform the datanode the block
+    // should be deleted.  Items are removed from the invalidate list
+    // upon giving instructions to the datanodes.
+    //
+    addToInvalidates(storedBlock, chosen.getDatanodeDescriptor());
+    blockLog.info("BLOCK* chooseExcessReplicates: "
+        +"({}, {}) is added to invalidated blocks set", chosen, storedBlock);
+  }
+
   /** Check if we can use delHint */
   static boolean useDelHint(boolean isFirst, DatanodeStorageInfo delHint,
       DatanodeStorageInfo added, List<DatanodeStorageInfo> moreThan1Racks,
@@ -3069,17 +3125,18 @@ public class BlockManager {
     }
   }
 
-  private void addToExcessReplicate(DatanodeInfo dn, Block block) {
+  private void addToExcessReplicate(DatanodeInfo dn, BlockInfo storedBlock) {
     assert namesystem.hasWriteLock();
-    LightWeightLinkedSet<Block> excessBlocks = excessReplicateMap.get(dn.getDatanodeUuid());
+    LightWeightLinkedSet<BlockInfo> excessBlocks = excessReplicateMap.get(
+        dn.getDatanodeUuid());
     if (excessBlocks == null) {
-      excessBlocks = new LightWeightLinkedSet<Block>();
+      excessBlocks = new LightWeightLinkedSet<>();
       excessReplicateMap.put(dn.getDatanodeUuid(), excessBlocks);
     }
-    if (excessBlocks.add(block)) {
+    if (excessBlocks.add(storedBlock)) {
       excessBlocksCount.incrementAndGet();
       blockLog.debug("BLOCK* addToExcessReplicate: ({}, {}) is added to"
-          + " excessReplicateMap", dn, block);
+          + " excessReplicateMap", dn, storedBlock);
     }
   }
 
@@ -3091,26 +3148,26 @@ public class BlockManager {
           QUEUE_REASON_FUTURE_GENSTAMP);
       return;
     }
-    removeStoredBlock(block, node);
+    removeStoredBlock(getStoredBlock(block), node);
   }
 
   /**
    * Modify (block-->datanode) map. Possibly generate replication tasks, if the
    * removed block is still valid.
    */
-  public void removeStoredBlock(Block block, DatanodeDescriptor node) {
-    blockLog.debug("BLOCK* removeStoredBlock: {} from {}", block, node);
+  public void removeStoredBlock(BlockInfo storedBlock,
+      DatanodeDescriptor node) {
+    blockLog.debug("BLOCK* removeStoredBlock: {} from {}", storedBlock, node);
     assert (namesystem.hasWriteLock());
     {
-      BlockInfo storedBlock = getStoredBlock(block);
       if (storedBlock == null || !blocksMap.removeNode(storedBlock, node)) {
         blockLog.debug("BLOCK* removeStoredBlock: {} has already been" +
-            " removed from node {}", block, node);
+            " removed from node {}", storedBlock, node);
         return;
       }
 
       CachedBlock cblock = namesystem.getCacheManager().getCachedBlocks()
-          .get(new CachedBlock(block.getBlockId(), (short) 0, false));
+          .get(new CachedBlock(storedBlock.getBlockId(), (short) 0, false));
       if (cblock != null) {
         boolean removed = false;
         removed |= node.getPendingCached().remove(cblock);
@@ -3118,7 +3175,7 @@ public class BlockManager {
         removed |= node.getPendingUncached().remove(cblock);
         if (removed) {
           blockLog.debug("BLOCK* removeStoredBlock: {} removed from caching "
-              + "related lists on node {}", block, node);
+              + "related lists on node {}", storedBlock, node);
         }
       }
 
@@ -3128,7 +3185,7 @@ public class BlockManager {
       // necessary. In that case, put block on a possibly-will-
       // be-replicated list.
       //
-      BlockCollection bc = blocksMap.getBlockCollection(block);
+      BlockCollection bc = storedBlock.getBlockCollection();
       if (bc != null) {
         namesystem.decrementSafeBlockCount(storedBlock);
         updateNeededReplications(storedBlock, -1, 0);
@@ -3138,13 +3195,13 @@ public class BlockManager {
       // We've removed a block from a node, so it's definitely no longer
       // in "excess" there.
       //
-      LightWeightLinkedSet<Block> excessBlocks = excessReplicateMap.get(node
-          .getDatanodeUuid());
+      LightWeightLinkedSet<BlockInfo> excessBlocks = excessReplicateMap.get(
+          node.getDatanodeUuid());
       if (excessBlocks != null) {
-        if (excessBlocks.remove(block)) {
+        if (excessBlocks.remove(storedBlock)) {
           excessBlocksCount.decrementAndGet();
           blockLog.debug("BLOCK* removeStoredBlock: {} is removed from " +
-              "excessBlocks", block);
+              "excessBlocks", storedBlock);
           if (excessBlocks.size() == 0) {
             excessReplicateMap.remove(node.getDatanodeUuid());
           }
@@ -3152,7 +3209,7 @@ public class BlockManager {
       }
 
       // Remove the replica from corruptReplicas
-      corruptReplicas.removeFromCorruptReplicasMap(block, node);
+      corruptReplicas.removeFromCorruptReplicasMap(storedBlock, node);
     }
   }
 
@@ -3160,7 +3217,7 @@ public class BlockManager {
    * Get all valid locations of the block & add the block to results
    * return the length of the added block; 0 if the block is not added
    */
-  private long addBlock(Block block, List<BlockWithLocations> results) {
+  private long addBlock(BlockInfo block, List<BlockWithLocations> results) {
     final List<DatanodeStorageInfo> locations = getValidLocations(block);
     if(locations.size() == 0) {
       return 0;
@@ -3212,31 +3269,32 @@ public class BlockManager {
     processAndHandleReportedBlock(storageInfo, block, ReplicaState.FINALIZED,
         delHintNode);
   }
-  
+
   private void processAndHandleReportedBlock(
       DatanodeStorageInfo storageInfo, Block block,
       ReplicaState reportedState, DatanodeDescriptor delHintNode)
       throws IOException {
     // blockReceived reports a finalized block
-    Collection<BlockInfo> toAdd = new LinkedList<BlockInfo>();
-    Collection<Block> toInvalidate = new LinkedList<Block>();
-    Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<BlockToMarkCorrupt>();
-    Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>();
+    Collection<BlockInfoToAdd> toAdd = new LinkedList<>();
+    Collection<Block> toInvalidate = new LinkedList<>();
+    Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<>();
+    Collection<StatefulBlockInfo> toUC = new LinkedList<>();
     final DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
 
-    processReportedBlock(storageInfo, block, reportedState,
-                              toAdd, toInvalidate, toCorrupt, toUC);
+    processReportedBlock(storageInfo, block, reportedState, toAdd, toInvalidate,
+        toCorrupt, toUC);
     // the block is only in one of the to-do lists
     // if it is in none then data-node already has it
     assert toUC.size() + toAdd.size() + toInvalidate.size() + toCorrupt.size() <= 1
-      : "The block should be only in one of the lists.";
+        : "The block should be only in one of the lists.";
 
-    for (StatefulBlockInfo b : toUC) { 
+    for (StatefulBlockInfo b : toUC) {
       addStoredBlockUnderConstruction(b, storageInfo);
     }
     long numBlocksLogged = 0;
-    for (BlockInfo b : toAdd) {
-      addStoredBlock(b, storageInfo, delHintNode, numBlocksLogged < maxNumBlocksToLog);
+    for (BlockInfoToAdd b : toAdd) {
+      addStoredBlock(b.getStored(), b.getReported(), storageInfo, delHintNode,
+          numBlocksLogged < maxNumBlocksToLog);
       numBlocksLogged++;
     }
     if (numBlocksLogged > maxNumBlocksToLog) {
@@ -3301,7 +3359,7 @@ public class BlockManager {
                                       ReplicaState.RBW, null);
         break;
       default:
-        String msg = 
+        String msg =
           "Unknown block status code reported by " + nodeID +
           ": " + rdbi;
         blockLog.warn(msg);
@@ -3337,8 +3395,8 @@ public class BlockManager {
       } else if (node.isDecommissioned()) {
         decommissioned++;
       } else {
-        LightWeightLinkedSet<Block> blocksExcess = excessReplicateMap.get(node
-            .getDatanodeUuid());
+        LightWeightLinkedSet<BlockInfo> blocksExcess =
+            excessReplicateMap.get(node.getDatanodeUuid());
         if (blocksExcess != null && blocksExcess.contains(b)) {
           excess++;
         } else {
@@ -3391,13 +3449,13 @@ public class BlockManager {
     int numOverReplicated = 0;
     while(it.hasNext()) {
       final BlockInfo block = it.next();
-      BlockCollection bc = blocksMap.getBlockCollection(block);
-      short expectedReplication = bc.getPreferredBlockReplication();
+      int expectedReplication = this.getReplication(block);
       NumberReplicas num = countNodes(block);
       int numCurrentReplica = num.liveReplicas();
       if (numCurrentReplica > expectedReplication) {
         // over-replicated block 
-        processOverReplicatedBlock(block, expectedReplication, null, null);
+        processOverReplicatedBlock(block, (short) expectedReplication, null,
+            null);
         numOverReplicated++;
       }
     }
@@ -3423,7 +3481,7 @@ public class BlockManager {
     if (pendingReplicationBlocksCount == 0 &&
         underReplicatedBlocksCount == 0) {
       LOG.info("Node {} is dead and there are no under-replicated" +
-          " blocks or blocks pending replication. Safe to decommission.", 
+          " blocks or blocks pending replication. Safe to decommission.",
           node);
       return true;
     }
@@ -3441,6 +3499,12 @@ public class BlockManager {
     return blocksMap.size();
   }
 
+
+  /** @return an iterator of the datanodes. */
+  public Iterable<DatanodeStorageInfo> getStorages(final Block block) {
+    return blocksMap.getStorages(block);
+  }
+
   public DatanodeStorageInfo[] getStorages(BlockInfo block) {
     final DatanodeStorageInfo[] storages = new DatanodeStorageInfo[block.numNodes()];
     int i = 0;
@@ -3529,10 +3593,12 @@ public class BlockManager {
         final BlockInfoUnderConstruction uc =
             (BlockInfoUnderConstruction)b;
         final int numNodes = b.numNodes();
-        LOG.info("BLOCK* " + b + " is not COMPLETE (ucState = "
-          + uc.getBlockUCState() + ", replication# = " + numNodes
-          + (numNodes < minReplication ? " < ": " >= ")
-          + " minimum = " + minReplication + ") in file " + src);
+        final int min = getMinStorageNum(b);
+        final BlockUCState state = b.getBlockUCState();
+        LOG.info("BLOCK* " + b + " is not COMPLETE (ucState = " + state
+            + ", replication# = " + numNodes
+            + (numNodes < min ? " < " : " >= ")
+            + " minimum = " + min + ") in file " + src);
         return false;
       }
     }
@@ -3543,15 +3609,15 @@ public class BlockManager {
    * @return 0 if the block is not found;
    *         otherwise, return the replication factor of the block.
    */
-  private int getReplication(Block block) {
+  private int getReplication(BlockInfo block) {
     final BlockCollection bc = blocksMap.getBlockCollection(block);
-    return bc == null? 0: bc.getPreferredBlockReplication();
+    return bc == null? 0: getExpectedReplicaNum(bc, block);
   }
 
 
   /**
-   * Get blocks to invalidate for <i>nodeId</i>
-   * in {@link #invalidateBlocks}.
+   * Get blocks to invalidate for <i>nodeId</i>.
+   * in {@link #invalidateBlocks}.boolean blockHasEnoughRacks
    *
    * @return number of blocks scheduled for removal during this iteration.
    */
@@ -3589,22 +3655,20 @@ public class BlockManager {
     return toInvalidate.size();
   }
 
-  boolean blockHasEnoughRacks(Block b) {
+  boolean blockHasEnoughRacks(BlockInfo storedBlock, int expectedStorageNum) {
     if (!this.shouldCheckForEnoughRacks) {
       return true;
     }
-    boolean enoughRacks = false;;
-    Collection<DatanodeDescriptor> corruptNodes = 
-                                  corruptReplicas.getNodes(b);
-    int numExpectedReplicas = getReplication(b);
+    boolean enoughRacks = false;
+    Collection<DatanodeDescriptor> corruptNodes =
+        corruptReplicas.getNodes(storedBlock);
     String rackName = null;
-    for(DatanodeStorageInfo storage : blocksMap.getStorages(b)) {
+    for(DatanodeStorageInfo storage : getStorages(storedBlock)) {
       final DatanodeDescriptor cur = storage.getDatanodeDescriptor();
       if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) {
         if ((corruptNodes == null ) || !corruptNodes.contains(cur)) {
-          if (numExpectedReplicas == 1 ||
-              (numExpectedReplicas > 1 &&
-                  !datanodeManager.hasClusterEverBeenMultiRack())) {
+          if (expectedStorageNum == 1 || (expectedStorageNum > 1 &&
+              !datanodeManager.hasClusterEverBeenMultiRack())) {
             enoughRacks = true;
             break;
           }
@@ -3625,8 +3689,13 @@ public class BlockManager {
    * A block needs replication if the number of replicas is less than expected
    * or if it does not have enough racks.
    */
-  boolean isNeededReplication(Block b, int expected, int current) {
-    return current < expected || !blockHasEnoughRacks(b);
+  boolean isNeededReplication(BlockInfo storedBlock, int expected,
+      int current) {
+    return current < expected || !blockHasEnoughRacks(storedBlock, expected);
+  }
+
+  public short getExpectedReplicaNum(BlockCollection bc, BlockInfo block) {
+    return bc.getPreferredBlockReplication();
   }
   
   public long getMissingBlocksCount() {
@@ -3648,11 +3717,6 @@ public class BlockManager {
     return blocksMap.getBlockCollection(b);
   }
 
-  /** @return an iterator of the datanodes. */
-  public Iterable<DatanodeStorageInfo> getStorages(final Block block) {
-    return blocksMap.getStorages(block);
-  }
-
   public int numCorruptReplicas(Block block) {
     return corruptReplicas.numCorruptReplicas(block);
   }
@@ -3668,9 +3732,10 @@ public class BlockManager {
    * If a block is removed from blocksMap, remove it from excessReplicateMap.
    */
   private void removeFromExcessReplicateMap(Block block) {
-    for (DatanodeStorageInfo info : blocksMap.getStorages(block)) {
+    for (DatanodeStorageInfo info : getStorages(block)) {
       String uuid = info.getDatanodeDescriptor().getDatanodeUuid();
-      LightWeightLinkedSet<Block> excessReplicas = excessReplicateMap.get(uuid);
+      LightWeightLinkedSet<BlockInfo> excessReplicas =
+          excessReplicateMap.get(uuid);
       if (excessReplicas != null) {
         if (excessReplicas.remove(block)) {
           excessBlocksCount.decrementAndGet();
@@ -3685,26 +3750,6 @@ public class BlockManager {
   public int getCapacity() {
     return blocksMap.getCapacity();
   }
-  
-  /**
-   * Return a range of corrupt replica block ids. Up to numExpectedBlocks 
-   * blocks starting at the next block after startingBlockId are returned
-   * (fewer if numExpectedBlocks blocks are unavailable). If startingBlockId 
-   * is null, up to numExpectedBlocks blocks are returned from the beginning.
-   * If startingBlockId cannot be found, null is returned.
-   *
-   * @param numExpectedBlocks Number of block ids to return.
-   *  0 <= numExpectedBlocks <= 100
-   * @param startingBlockId Block id from which to start. If null, start at
-   *  beginning.
-   * @return Up to numExpectedBlocks blocks from startingBlockId if it exists
-   *
-   */
-  public long[] getCorruptReplicaBlockIds(int numExpectedBlocks,
-                                   Long startingBlockId) {
-    return corruptReplicas.getCorruptReplicaBlockIds(numExpectedBlocks,
-                                                     startingBlockId);
-  }
 
   /**
    * Return an iterator over the set of blocks for which there are no replicas.
@@ -3879,7 +3924,7 @@ public class BlockManager {
 
   /**
    * A simple result enum for the result of
-   * {@link BlockManager#processMisReplicatedBlock(BlockInfo)}.
+   * {@link BlockManager#processMisReplicatedBlock}.
    */
   enum MisReplicationResult {
     /** The block should be invalidated since it belongs to a deleted file. */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de480d6c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
index 57e81b4..65b83e1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
@@ -24,6 +24,7 @@ import java.util.List;
 import com.google.common.annotations.VisibleForTesting;
 
 import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State;
@@ -226,7 +227,7 @@ public class DatanodeStorageInfo {
     return blockPoolUsed;
   }
 
-  public AddBlockResult addBlock(BlockInfo b) {
+  public AddBlockResult addBlock(BlockInfo b, Block reportedBlock) {
     // First check whether the block belongs to a different storage
     // on the same DN.
     AddBlockResult result = AddBlockResult.ADDED;
@@ -245,10 +246,18 @@ public class DatanodeStorageInfo {
     }
 
     // add to the head of the data-node list
-    b.addStorage(this);
+    b.addStorage(this, reportedBlock);
+    insertToList(b);
+    return result;
+  }
+
+  AddBlockResult addBlock(BlockInfo b) {
+    return addBlock(b, b);
+  }
+
+  public void insertToList(BlockInfo b) {
     blockList = b.listInsert(blockList, this);
     numBlocks++;
-    return result;
   }
 
   public boolean removeBlock(BlockInfo b) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de480d6c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index b073a89..d0f4e08 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -143,7 +143,6 @@ import org.apache.hadoop.fs.CacheFlag;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
-import org.apache.hadoop.fs.FileEncryptionInfo;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FsServerDefaults;
@@ -3136,7 +3135,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       if (trackBlockCounts) {
         if (b.isComplete()) {
           numRemovedComplete++;
-          if (blockManager.checkMinReplication(b)) {
+          if (blockManager.hasMinStorage(b)) {
             numRemovedSafe++;
           }
         }
@@ -3368,7 +3367,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       curBlock = blocks[nrCompleteBlocks];
       if(!curBlock.isComplete())
         break;
-      assert blockManager.checkMinReplication(curBlock) :
+      assert blockManager.hasMinStorage(curBlock) :
               "A COMPLETE block is not minimally replicated in " + src;
     }
 
@@ -3404,7 +3403,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
 
     // If penultimate block doesn't exist then its minReplication is met
     boolean penultimateBlockMinReplication = penultimateBlock == null ? true :
-        blockManager.checkMinReplication(penultimateBlock);
+        blockManager.hasMinStorage(penultimateBlock);
 
     switch(lastBlockState) {
     case COMPLETE:
@@ -3413,7 +3412,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     case COMMITTED:
       // Close file if committed blocks are minimally replicated
       if(penultimateBlockMinReplication &&
-          blockManager.checkMinReplication(lastBlock)) {
+          blockManager.hasMinStorage(lastBlock)) {
         finalizeINodeFileUnderConstruction(src, pendingFile,
             iip.getLatestSnapshotId());
         NameNode.stateChangeLog.warn("BLOCK*"
@@ -3705,9 +3704,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
                 trimmedTargets.get(i).getStorageInfo(trimmedStorages.get(i));
             if (storageInfo != null) {
               if(copyTruncate) {
-                storageInfo.addBlock(truncatedBlock);
+                storageInfo.addBlock(truncatedBlock, truncatedBlock);
               } else {
-                storageInfo.addBlock(storedBlock);
+                storageInfo.addBlock(storedBlock, storedBlock);
               }
             }
           }
@@ -3723,8 +3722,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         } else {
           iFile.setLastBlock(storedBlock, trimmedStorageInfos);
           if (closeFile) {
-            blockManager.markBlockReplicasAsCorrupt(storedBlock,
-                oldGenerationStamp, oldNumBytes, trimmedStorageInfos);
+            blockManager.markBlockReplicasAsCorrupt(oldBlock.getLocalBlock(),
+                storedBlock, oldGenerationStamp, oldNumBytes,
+                trimmedStorageInfos);
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de480d6c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
index 7d4cd7e..ab179b4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
@@ -647,7 +647,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
                   .getStorageType()));
             }
             if (showReplicaDetails) {
-              LightWeightLinkedSet<Block> blocksExcess =
+              LightWeightLinkedSet<BlockInfo> blocksExcess =
                   bm.excessReplicateMap.get(dnDesc.getDatanodeUuid());
               Collection<DatanodeDescriptor> corruptReplicas =
                   bm.getCorruptReplicas(block.getLocalBlock());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de480d6c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java
index 5126aa7..bae4f1d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java
@@ -63,7 +63,7 @@ public class TestBlockInfo {
 
     final DatanodeStorageInfo storage = DFSTestUtil.createDatanodeStorageInfo("storageID", "127.0.0.1");
 
-    boolean added = blockInfo.addStorage(storage);
+    boolean added = blockInfo.addStorage(storage, blockInfo);
 
     Assert.assertTrue(added);
     Assert.assertEquals(storage, blockInfo.getStorageInfo(0));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de480d6c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
index 396dff3..9e31670 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
@@ -383,7 +383,7 @@ public class TestBlockManager {
     for (int i = 1; i < pipeline.length; i++) {
       DatanodeStorageInfo storage = pipeline[i];
       bm.addBlock(storage, blockInfo, null);
-      blockInfo.addStorage(storage);
+      blockInfo.addStorage(storage, blockInfo);
     }
   }
 
@@ -393,7 +393,7 @@ public class TestBlockManager {
 
     for (DatanodeDescriptor dn : nodes) {
       for (DatanodeStorageInfo storage : dn.getStorageInfos()) {
-        blockInfo.addStorage(storage);
+        blockInfo.addStorage(storage, blockInfo);
       }
     }
     return blockInfo;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de480d6c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNodeCount.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNodeCount.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNodeCount.java
index 1c3f075..c33667d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNodeCount.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNodeCount.java
@@ -100,7 +100,7 @@ public class TestNodeCount {
       DatanodeDescriptor nonExcessDN = null;
       for(DatanodeStorageInfo storage : bm.blocksMap.getStorages(block.getLocalBlock())) {
         final DatanodeDescriptor dn = storage.getDatanodeDescriptor();
-        Collection<Block> blocks = bm.excessReplicateMap.get(dn.getDatanodeUuid());
+        Collection<BlockInfo> blocks = bm.excessReplicateMap.get(dn.getDatanodeUuid());
         if (blocks == null || !blocks.contains(block.getLocalBlock()) ) {
           nonExcessDN = dn;
           break;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de480d6c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java
index 2d7bb44..83b3aa0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java
@@ -34,7 +34,6 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
-import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
@@ -42,7 +41,6 @@ import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
-import org.apache.hadoop.util.Time;
 import org.junit.Test;
 
 public class TestOverReplicatedBlocks {
@@ -185,7 +183,7 @@ public class TestOverReplicatedBlocks {
       // All replicas for deletion should be scheduled on lastDN.
       // And should not actually be deleted, because lastDN does not heartbeat.
       namesystem.readLock();
-      Collection<Block> dnBlocks = 
+      Collection<BlockInfo> dnBlocks =
         namesystem.getBlockManager().excessReplicateMap.get(lastDNid);
       assertEquals("Replicas on node " + lastDNid + " should have been deleted",
           SMALL_FILE_LENGTH / SMALL_BLOCK_SIZE, dnBlocks.size());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de480d6c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
index 2812957..44f0e65 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
@@ -1250,7 +1250,7 @@ public class TestReplicationPolicy {
     when(storage.removeBlock(any(BlockInfo.class))).thenReturn(true);
     when(storage.addBlock(any(BlockInfo.class))).thenReturn
         (DatanodeStorageInfo.AddBlockResult.ADDED);
-    ucBlock.addStorage(storage);
+    ucBlock.addStorage(storage, ucBlock);
 
     when(mbc.setLastBlock((BlockInfo) any(), (DatanodeStorageInfo[]) any()))
     .thenReturn(ucBlock);