You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zh...@apache.org on 2016/03/17 00:54:18 UTC

[2/2] hadoop git commit: HDFS-9857. Erasure Coding: Rename replication-based names in BlockManager to more generic [part-1]. Contributed by Rakesh R.

HDFS-9857. Erasure Coding: Rename replication-based names in BlockManager to more generic [part-1]. Contributed by Rakesh R.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/32d043d9
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/32d043d9
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/32d043d9

Branch: refs/heads/trunk
Commit: 32d043d9c5f4615058ea4f65a58ba271ba47fcb5
Parents: 605fdcb
Author: Zhe Zhang <ze...@zezhang-ld1.linkedin.biz>
Authored: Wed Mar 16 16:53:58 2016 -0700
Committer: Zhe Zhang <ze...@zezhang-ld1.linkedin.biz>
Committed: Wed Mar 16 16:53:58 2016 -0700

----------------------------------------------------------------------
 .../server/blockmanagement/BlockManager.java    | 268 +++++------
 .../blockmanagement/DecommissionManager.java    |  30 +-
 .../blockmanagement/LowRedundancyBlocks.java    | 458 +++++++++++++++++++
 .../blockmanagement/UnderReplicatedBlocks.java  | 448 ------------------
 .../blockmanagement/BlockManagerTestUtil.java   |   2 +-
 .../blockmanagement/TestBlockManager.java       |  20 +-
 .../TestLowRedundancyBlockQueues.java           | 182 ++++++++
 .../blockmanagement/TestPendingReplication.java |  14 +-
 .../blockmanagement/TestReplicationPolicy.java  | 158 +++----
 .../TestUnderReplicatedBlockQueues.java         | 182 --------
 .../hdfs/server/namenode/TestMetaSave.java      |   2 +-
 11 files changed, 891 insertions(+), 873 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/32d043d9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 6ed102c..66ab789 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -149,7 +149,7 @@ public class BlockManager implements BlockStatsMXBean {
 
   private volatile long pendingReplicationBlocksCount = 0L;
   private volatile long corruptReplicaBlocksCount = 0L;
-  private volatile long underReplicatedBlocksCount = 0L;
+  private volatile long lowRedundancyBlocksCount = 0L;
   private volatile long scheduledReplicationBlocksCount = 0L;
 
   /** flag indicating whether replication queues have been initialized */
@@ -166,7 +166,7 @@ public class BlockManager implements BlockStatsMXBean {
   }
   /** Used by metrics */
   public long getUnderReplicatedBlocksCount() {
-    return underReplicatedBlocksCount;
+    return lowRedundancyBlocksCount;
   }
   /** Used by metrics */
   public long getCorruptReplicaBlocksCount() {
@@ -250,9 +250,10 @@ public class BlockManager implements BlockStatsMXBean {
 
   /**
    * Store set of Blocks that need to be replicated 1 or more times.
-   * We also store pending replication-orders.
+   * We also store pending reconstruction-orders.
    */
-  public final UnderReplicatedBlocks neededReplications = new UnderReplicatedBlocks();
+  public final LowRedundancyBlocks neededReconstruction =
+      new LowRedundancyBlocks();
 
   @VisibleForTesting
   final PendingReplicationBlocks pendingReplications;
@@ -294,20 +295,20 @@ public class BlockManager implements BlockStatsMXBean {
   private boolean shouldPostponeBlocksFromFuture = false;
 
   /**
-   * Process replication queues asynchronously to allow namenode safemode exit
-   * and failover to be faster. HDFS-5496
+   * Process reconstruction queues asynchronously to allow namenode safemode
+   * exit and failover to be faster. HDFS-5496.
    */
-  private Daemon replicationQueuesInitializer = null;
+  private Daemon reconstructionQueuesInitializer = null;
   /**
-   * Number of blocks to process asychronously for replication queues
+   * Number of blocks to process asychronously for reconstruction queues
    * initialization once aquired the namesystem lock. Remaining blocks will be
    * processed again after aquiring lock again.
    */
   private int numBlocksPerIteration;
   /**
-   * Progress of the Replication queues initialisation.
+   * Progress of the Reconstruction queues initialisation.
    */
-  private double replicationQueuesInitProgress = 0.0;
+  private double reconstructionQueuesInitProgress = 0.0;
 
   /** for block replicas placement */
   private BlockPlacementPolicies placementPolicies;
@@ -576,12 +577,12 @@ public class BlockManager implements BlockStatsMXBean {
     out.println("Live Datanodes: " + live.size());
     out.println("Dead Datanodes: " + dead.size());
     //
-    // Dump contents of neededReplication
+    // Dump contents of neededReconstruction
     //
-    synchronized (neededReplications) {
-      out.println("Metasave: Blocks waiting for replication: " + 
-                  neededReplications.size());
-      for (Block block : neededReplications) {
+    synchronized (neededReconstruction) {
+      out.println("Metasave: Blocks waiting for reconstruction: "
+          + neededReconstruction.size());
+      for (Block block : neededReconstruction) {
         dumpBlockMeta(block, out);
       }
     }
@@ -616,7 +617,7 @@ public class BlockManager implements BlockStatsMXBean {
     // source node returned is not used
     chooseSourceDatanodes(getStoredBlock(block), containingNodes,
         containingLiveReplicasNodes, numReplicas,
-        new LinkedList<Byte>(), UnderReplicatedBlocks.LEVEL);
+        new LinkedList<Byte>(), LowRedundancyBlocks.LEVEL);
     
     // containingLiveReplicasNodes can include READ_ONLY_SHARED replicas which are 
     // not included in the numReplicas.liveReplicas() count
@@ -849,9 +850,9 @@ public class BlockManager implements BlockStatsMXBean {
     // is happening
     bc.convertLastBlockToUC(lastBlock, targets);
 
-    // Remove block from replication queue.
+    // Remove block from reconstruction queue.
     NumberReplicas replicas = countNodes(lastBlock);
-    neededReplications.remove(lastBlock, replicas.liveReplicas(),
+    neededReconstruction.remove(lastBlock, replicas.liveReplicas(),
         replicas.readOnlyReplicas(),
         replicas.decommissionedAndDecommissioning(), getReplication(lastBlock));
     pendingReplications.remove(lastBlock);
@@ -1365,8 +1366,8 @@ public class BlockManager implements BlockStatsMXBean {
       // the block is over-replicated so invalidate the replicas immediately
       invalidateBlock(b, node, numberOfReplicas);
     } else if (isPopulatingReplQueues()) {
-      // add the block to neededReplication
-      updateNeededReplications(b.getStored(), -1, 0);
+      // add the block to neededReconstruction
+      updateNeededReconstructions(b.getStored(), -1, 0);
     }
   }
 
@@ -1418,13 +1419,13 @@ public class BlockManager implements BlockStatsMXBean {
   
   void updateState() {
     pendingReplicationBlocksCount = pendingReplications.size();
-    underReplicatedBlocksCount = neededReplications.size();
+    lowRedundancyBlocksCount = neededReconstruction.size();
     corruptReplicaBlocksCount = corruptReplicas.size();
   }
 
-  /** Return number of under-replicated but not missing blocks */
+  /** Return number of low redundancy blocks but not missing blocks. */
   public int getUnderReplicatedNotMissingBlocks() {
-    return neededReplications.getUnderReplicatedBlockCount();
+    return neededReconstruction.getLowRedundancyBlockCount();
   }
   
   /**
@@ -1452,25 +1453,26 @@ public class BlockManager implements BlockStatsMXBean {
   }
 
   /**
-   * Scan blocks in {@link #neededReplications} and assign reconstruction
+   * Scan blocks in {@link #neededReconstruction} and assign reconstruction
    * (replication or erasure coding) work to data-nodes they belong to.
    *
    * The number of process blocks equals either twice the number of live
-   * data-nodes or the number of under-replicated blocks whichever is less.
+   * data-nodes or the number of low redundancy blocks whichever is less.
    *
-   * @return number of blocks scheduled for replication during this iteration.
+   * @return number of blocks scheduled for reconstruction during this
+   *         iteration.
    */
   int computeBlockReconstructionWork(int blocksToProcess) {
-    List<List<BlockInfo>> blocksToReplicate = null;
+    List<List<BlockInfo>> blocksToReconstruct = null;
     namesystem.writeLock();
     try {
-      // Choose the blocks to be replicated
-      blocksToReplicate = neededReplications
-          .chooseUnderReplicatedBlocks(blocksToProcess);
+      // Choose the blocks to be reconstructed
+      blocksToReconstruct = neededReconstruction
+          .chooseLowRedundancyBlocks(blocksToProcess);
     } finally {
       namesystem.writeUnlock();
     }
-    return computeReconstructionWorkForBlocks(blocksToReplicate);
+    return computeReconstructionWorkForBlocks(blocksToReconstruct);
   }
 
   /**
@@ -1489,7 +1491,7 @@ public class BlockManager implements BlockStatsMXBean {
     // Step 1: categorize at-risk blocks into replication and EC tasks
     namesystem.writeLock();
     try {
-      synchronized (neededReplications) {
+      synchronized (neededReconstruction) {
         for (int priority = 0; priority < blocksToReconstruct
             .size(); priority++) {
           for (BlockInfo block : blocksToReconstruct.get(priority)) {
@@ -1533,7 +1535,7 @@ public class BlockManager implements BlockStatsMXBean {
           continue;
         }
 
-        synchronized (neededReplications) {
+        synchronized (neededReconstruction) {
           if (validateReconstructionWork(rw)) {
             scheduledWork++;
           }
@@ -1544,7 +1546,7 @@ public class BlockManager implements BlockStatsMXBean {
     }
 
     if (blockLog.isDebugEnabled()) {
-      // log which blocks have been scheduled for replication
+      // log which blocks have been scheduled for reconstruction
       for(BlockReconstructionWork rw : reconWork){
         DatanodeStorageInfo[] targets = rw.getTargets();
         if (targets != null && targets.length != 0) {
@@ -1558,8 +1560,9 @@ public class BlockManager implements BlockStatsMXBean {
         }
       }
 
-      blockLog.debug("BLOCK* neededReplications = {} pendingReplications = {}",
-          neededReplications.size(), pendingReplications.size());
+      blockLog.debug(
+          "BLOCK* neededReconstruction = {} pendingReplications = {}",
+          neededReconstruction.size(), pendingReplications.size());
     }
 
     return scheduledWork;
@@ -1576,8 +1579,8 @@ public class BlockManager implements BlockStatsMXBean {
       int priority) {
     // skip abandoned block or block reopened for append
     if (block.isDeleted() || !block.isCompleteOrCommitted()) {
-      // remove from neededReplications
-      neededReplications.remove(block, priority);
+      // remove from neededReconstruction
+      neededReconstruction.remove(block, priority);
       return null;
     }
 
@@ -1605,8 +1608,8 @@ public class BlockManager implements BlockStatsMXBean {
     int pendingNum = pendingReplications.getNumReplicas(block);
     if (hasEnoughEffectiveReplicas(block, numReplicas, pendingNum,
         requiredReplication)) {
-      neededReplications.remove(block, priority);
-      blockLog.debug("BLOCK* Removing {} from neededReplications as" +
+      neededReconstruction.remove(block, priority);
+      blockLog.debug("BLOCK* Removing {} from neededReconstruction as" +
           " it has enough replicas", block);
       return null;
     }
@@ -1662,7 +1665,7 @@ public class BlockManager implements BlockStatsMXBean {
     // Recheck since global lock was released
     // skip abandoned block or block reopened for append
     if (block.isDeleted() || !block.isCompleteOrCommitted()) {
-      neededReplications.remove(block, priority);
+      neededReconstruction.remove(block, priority);
       rw.resetTargets();
       return false;
     }
@@ -1673,7 +1676,7 @@ public class BlockManager implements BlockStatsMXBean {
     final int pendingNum = pendingReplications.getNumReplicas(block);
     if (hasEnoughEffectiveReplicas(block, numReplicas, pendingNum,
         requiredReplication)) {
-      neededReplications.remove(block, priority);
+      neededReconstruction.remove(block, priority);
       rw.resetTargets();
       blockLog.debug("BLOCK* Removing {} from neededReplications as" +
           " it has enough replicas", block);
@@ -1705,9 +1708,9 @@ public class BlockManager implements BlockStatsMXBean {
         + "pendingReplications", block);
 
     int numEffectiveReplicas = numReplicas.liveReplicas() + pendingNum;
-    // remove from neededReplications
+    // remove from neededReconstruction
     if(numEffectiveReplicas + targets.length >= requiredReplication) {
-      neededReplications.remove(block, priority);
+      neededReconstruction.remove(block, priority);
     }
     return true;
   }
@@ -1852,7 +1855,7 @@ public class BlockManager implements BlockStatsMXBean {
         continue;
       }
 
-      if(priority != UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY 
+      if (priority != LowRedundancyBlocks.QUEUE_HIGHEST_PRIORITY
           && !node.isDecommissionInProgress() 
           && node.getNumberOfBlocksToBeReplicated() >= maxReplicationStreams) {
         continue; // already reached replication limit
@@ -1905,9 +1908,10 @@ public class BlockManager implements BlockStatsMXBean {
             continue;
           }
           NumberReplicas num = countNodes(timedOutItems[i]);
-          if (isNeededReplication(bi, num.liveReplicas())) {
-            neededReplications.add(bi, num.liveReplicas(), num.readOnlyReplicas(),
-                num.decommissionedAndDecommissioning(), getReplication(bi));
+          if (isNeededReconstruction(bi, num.liveReplicas())) {
+            neededReconstruction.add(bi, num.liveReplicas(),
+                num.readOnlyReplicas(), num.decommissionedAndDecommissioning(),
+                getReplication(bi));
           }
         }
       } finally {
@@ -2777,7 +2781,7 @@ public class BlockManager implements BlockStatsMXBean {
    * intended for use with initial block report at startup. If not in startup
    * safe mode, will call standard addStoredBlock(). Assumes this method is
    * called "immediately" so there is no need to refresh the storedBlock from
-   * blocksMap. Doesn't handle underReplication/overReplication, or worry about
+   * blocksMap. Doesn't handle low redundancy/extra redundancy, or worry about
    * pendingReplications or corruptReplicas, because it's in startup safe mode.
    * Doesn't log every block, because there are typically millions of them.
    * 
@@ -2812,7 +2816,7 @@ public class BlockManager implements BlockStatsMXBean {
 
   /**
    * Modify (block-->datanode) map. Remove block from set of
-   * needed replications if this takes care of the problem.
+   * needed reconstruction if this takes care of the problem.
    * @return the block that is stored in blocksMap.
    */
   private Block addStoredBlock(final BlockInfo block,
@@ -2890,24 +2894,25 @@ public class BlockManager implements BlockStatsMXBean {
       return storedBlock;
     }
 
-    // do not try to handle over/under-replicated blocks during first safe mode
+    // do not try to handle extra/low redundancy blocks during first safe mode
     if (!isPopulatingReplQueues()) {
       return storedBlock;
     }
 
-    // handle underReplication/overReplication
+    // handle low redundancy/extra redundancy
     short fileReplication = getExpectedReplicaNum(storedBlock);
-    if (!isNeededReplication(storedBlock, numCurrentReplica)) {
-      neededReplications.remove(storedBlock, numCurrentReplica,
+    if (!isNeededReconstruction(storedBlock, numCurrentReplica)) {
+      neededReconstruction.remove(storedBlock, numCurrentReplica,
           num.readOnlyReplicas(),
           num.decommissionedAndDecommissioning(), fileReplication);
     } else {
-      updateNeededReplications(storedBlock, curReplicaDelta, 0);
+      updateNeededReconstructions(storedBlock, curReplicaDelta, 0);
     }
-    if (shouldProcessOverReplicated(num, fileReplication)) {
-      processOverReplicatedBlock(storedBlock, fileReplication, node, delNodeHint);
+    if (shouldProcessExtraRedundancy(num, fileReplication)) {
+      processExtraRedundancyBlock(storedBlock, fileReplication, node,
+          delNodeHint);
     }
-    // If the file replication has reached desired value
+    // If the file redundancy has reached desired value
     // we can remove any corrupt replicas the block may have
     int corruptReplicasCount = corruptReplicas.numCorruptReplicas(storedBlock);
     int numCorruptNodes = num.corruptReplicas();
@@ -2922,7 +2927,7 @@ public class BlockManager implements BlockStatsMXBean {
     return storedBlock;
   }
 
-  private boolean shouldProcessOverReplicated(NumberReplicas num,
+  private boolean shouldProcessExtraRedundancy(NumberReplicas num,
       int expectedNum) {
     final int numCurrent = num.liveReplicas();
     return numCurrent > expectedNum ||
@@ -2972,42 +2977,44 @@ public class BlockManager implements BlockStatsMXBean {
 
   /**
    * For each block in the name-node verify whether it belongs to any file,
-   * over or under replicated. Place it into the respective queue.
+   * extra or low redundancy. Place it into the respective queue.
    */
   public void processMisReplicatedBlocks() {
     assert namesystem.hasWriteLock();
-    stopReplicationInitializer();
-    neededReplications.clear();
-    replicationQueuesInitializer = new Daemon() {
+    stopReconstructionInitializer();
+    neededReconstruction.clear();
+    reconstructionQueuesInitializer = new Daemon() {
 
       @Override
       public void run() {
         try {
           processMisReplicatesAsync();
         } catch (InterruptedException ie) {
-          LOG.info("Interrupted while processing replication queues.");
+          LOG.info("Interrupted while processing reconstruction queues.");
         } catch (Exception e) {
-          LOG.error("Error while processing replication queues async", e);
+          LOG.error("Error while processing reconstruction queues async", e);
         }
       }
     };
-    replicationQueuesInitializer.setName("Replication Queue Initializer");
-    replicationQueuesInitializer.start();
+    reconstructionQueuesInitializer
+        .setName("Reconstruction Queue Initializer");
+    reconstructionQueuesInitializer.start();
   }
 
   /*
-   * Stop the ongoing initialisation of replication queues
+   * Stop the ongoing initialisation of reconstruction queues
    */
-  private void stopReplicationInitializer() {
-    if (replicationQueuesInitializer != null) {
-      replicationQueuesInitializer.interrupt();
+  private void stopReconstructionInitializer() {
+    if (reconstructionQueuesInitializer != null) {
+      reconstructionQueuesInitializer.interrupt();
       try {
-        replicationQueuesInitializer.join();
+        reconstructionQueuesInitializer.join();
       } catch (final InterruptedException e) {
-        LOG.warn("Interrupted while waiting for replicationQueueInitializer. Returning..");
+        LOG.warn("Interrupted while waiting for "
+            + "reconstructionQueueInitializer. Returning..");
         return;
       } finally {
-        replicationQueuesInitializer = null;
+        reconstructionQueuesInitializer = null;
       }
     }
   }
@@ -3025,7 +3032,7 @@ public class BlockManager implements BlockStatsMXBean {
     long startTimeMisReplicatedScan = Time.monotonicNow();
     Iterator<BlockInfo> blocksItr = blocksMap.getBlocks().iterator();
     long totalBlocks = blocksMap.size();
-    replicationQueuesInitProgress = 0;
+    reconstructionQueuesInitProgress = 0;
     long totalProcessed = 0;
     long sleepDuration =
         Math.max(1, Math.min(numBlocksPerIteration/1000, 10000));
@@ -3067,7 +3074,7 @@ public class BlockManager implements BlockStatsMXBean {
         totalProcessed += processed;
         // there is a possibility that if any of the blocks deleted/added during
         // initialisation, then progress might be different.
-        replicationQueuesInitProgress = Math.min((double) totalProcessed
+        reconstructionQueuesInitProgress = Math.min((double) totalProcessed
             / totalBlocks, 1.0);
 
         if (!blocksItr.hasNext()) {
@@ -3097,12 +3104,12 @@ public class BlockManager implements BlockStatsMXBean {
   }
 
   /**
-   * Get the progress of the Replication queues initialisation
+   * Get the progress of the reconstruction queues initialisation
    * 
    * @return Returns values between 0 and 1 for the progress.
    */
-  public double getReplicationQueuesInitProgress() {
-    return replicationQueuesInitProgress;
+  public double getReconstructionQueuesInitProgress() {
+    return reconstructionQueuesInitProgress;
   }
 
   /**
@@ -3134,15 +3141,16 @@ public class BlockManager implements BlockStatsMXBean {
     short expectedReplication = getExpectedReplicaNum(block);
     NumberReplicas num = countNodes(block);
     final int numCurrentReplica = num.liveReplicas();
-    // add to under-replicated queue if need to be
-    if (isNeededReplication(block, numCurrentReplica)) {
-      if (neededReplications.add(block, numCurrentReplica, num.readOnlyReplicas(),
-          num.decommissionedAndDecommissioning(), expectedReplication)) {
+    // add to low redundancy queue if need to be
+    if (isNeededReconstruction(block, numCurrentReplica)) {
+      if (neededReconstruction.add(block, numCurrentReplica,
+          num.readOnlyReplicas(), num.decommissionedAndDecommissioning(),
+          expectedReplication)) {
         return MisReplicationResult.UNDER_REPLICATED;
       }
     }
 
-    if (shouldProcessOverReplicated(num, expectedReplication)) {
+    if (shouldProcessExtraRedundancy(num, expectedReplication)) {
       if (num.replicasOnStaleNodes() > 0) {
         // If any of the replicas of this block are on nodes that are
         // considered "stale", then these replicas may in fact have
@@ -3152,8 +3160,8 @@ public class BlockManager implements BlockStatsMXBean {
         return MisReplicationResult.POSTPONE;
       }
       
-      // over-replicated block
-      processOverReplicatedBlock(block, expectedReplication, null, null);
+      // extra redundancy block
+      processExtraRedundancyBlock(block, expectedReplication, null, null);
       return MisReplicationResult.OVER_REPLICATED;
     }
     
@@ -3167,12 +3175,12 @@ public class BlockManager implements BlockStatsMXBean {
       return;
     }
 
-    // update needReplication priority queues
+    // update neededReconstruction priority queues
     b.setReplication(newRepl);
-    updateNeededReplications(b, 0, newRepl - oldRepl);
+    updateNeededReconstructions(b, 0, newRepl - oldRepl);
 
     if (oldRepl > newRepl) {
-      processOverReplicatedBlock(b, newRepl, null, null);
+      processExtraRedundancyBlock(b, newRepl, null, null);
     }
   }
 
@@ -3181,7 +3189,7 @@ public class BlockManager implements BlockStatsMXBean {
    * If there are any extras, call chooseExcessReplicates() to
    * mark them in the excessReplicateMap.
    */
-  private void processOverReplicatedBlock(final BlockInfo block,
+  private void processExtraRedundancyBlock(final BlockInfo block,
       final short replication, final DatanodeDescriptor addedNode,
       DatanodeDescriptor delNodeHint) {
     assert namesystem.hasWriteLock();
@@ -3405,7 +3413,7 @@ public class BlockManager implements BlockStatsMXBean {
       //
       if (!storedBlock.isDeleted()) {
         bmSafeMode.decrementSafeBlockCount(storedBlock);
-        updateNeededReplications(storedBlock, -1, 0);
+        updateNeededReconstructions(storedBlock, -1, 0);
       }
 
       excessReplicas.remove(node, storedBlock);
@@ -3748,29 +3756,29 @@ public class BlockManager implements BlockStatsMXBean {
   
   /**
    * On stopping decommission, check if the node has excess replicas.
-   * If there are any excess replicas, call processOverReplicatedBlock().
-   * Process over replicated blocks only when active NN is out of safe mode.
+   * If there are any excess replicas, call processExtraRedundancyBlock().
+   * Process extra redundancy blocks only when active NN is out of safe mode.
    */
-  void processOverReplicatedBlocksOnReCommission(
+  void processExtraRedundancyBlocksOnReCommission(
       final DatanodeDescriptor srcNode) {
     if (!isPopulatingReplQueues()) {
       return;
     }
     final Iterator<BlockInfo> it = srcNode.getBlockIterator();
-    int numOverReplicated = 0;
+    int numExtraRedundancy = 0;
     while(it.hasNext()) {
       final BlockInfo block = it.next();
       int expectedReplication = this.getReplication(block);
       NumberReplicas num = countNodes(block);
-      if (shouldProcessOverReplicated(num, expectedReplication)) {
-        // over-replicated block
-        processOverReplicatedBlock(block, (short) expectedReplication, null,
+      if (shouldProcessExtraRedundancy(num, expectedReplication)) {
+        // extra redundancy block
+        processExtraRedundancyBlock(block, (short) expectedReplication, null,
             null);
-        numOverReplicated++;
+        numExtraRedundancy++;
       }
     }
-    LOG.info("Invalidated " + numOverReplicated + " over-replicated blocks on " +
-        srcNode + " during recommissioning");
+    LOG.info("Invalidated " + numExtraRedundancy
+        + " extra redundancy blocks on " + srcNode + " during recommissioning");
   }
 
   /**
@@ -3789,9 +3797,9 @@ public class BlockManager implements BlockStatsMXBean {
 
     updateState();
     if (pendingReplicationBlocksCount == 0 &&
-        underReplicatedBlocksCount == 0) {
-      LOG.info("Node {} is dead and there are no under-replicated" +
-          " blocks or blocks pending replication. Safe to decommission.",
+        lowRedundancyBlocksCount == 0) {
+      LOG.info("Node {} is dead and there are no low redundancy" +
+          " blocks or blocks pending reconstruction. Safe to decommission.",
           node);
       return true;
     }
@@ -3835,9 +3843,9 @@ public class BlockManager implements BlockStatsMXBean {
     block.setNumBytes(BlockCommand.NO_ACK);
     addToInvalidates(block);
     removeBlockFromMap(block);
-    // Remove the block from pendingReplications and neededReplications
+    // Remove the block from pendingReplications and neededReconstruction
     pendingReplications.remove(block);
-    neededReplications.remove(block, UnderReplicatedBlocks.LEVEL);
+    neededReconstruction.remove(block, LowRedundancyBlocks.LEVEL);
     if (postponedMisreplicatedBlocks.remove(block)) {
       postponedMisreplicatedBlocksCount.decrementAndGet();
     }
@@ -3859,8 +3867,8 @@ public class BlockManager implements BlockStatsMXBean {
         new Block(BlockIdManager.convertToStripedID(block.getBlockId())));
   }
 
-  /** updates a block in under replication queue */
-  private void updateNeededReplications(final BlockInfo block,
+  /** updates a block in needed reconstruction queue. */
+  private void updateNeededReconstructions(final BlockInfo block,
       final int curReplicasDelta, int expectedReplicasDelta) {
     namesystem.writeLock();
     try {
@@ -3869,14 +3877,14 @@ public class BlockManager implements BlockStatsMXBean {
       }
       NumberReplicas repl = countNodes(block);
       int curExpectedReplicas = getReplication(block);
-      if (isNeededReplication(block, repl.liveReplicas())) {
-        neededReplications.update(block, repl.liveReplicas(), repl.readOnlyReplicas(),
-            repl.decommissionedAndDecommissioning(), curExpectedReplicas,
-            curReplicasDelta, expectedReplicasDelta);
+      if (isNeededReconstruction(block, repl.liveReplicas())) {
+        neededReconstruction.update(block, repl.liveReplicas(),
+            repl.readOnlyReplicas(), repl.decommissionedAndDecommissioning(),
+            curExpectedReplicas, curReplicasDelta, expectedReplicasDelta);
       } else {
         int oldReplicas = repl.liveReplicas()-curReplicasDelta;
         int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta;
-        neededReplications.remove(block, oldReplicas, repl.readOnlyReplicas(),
+        neededReconstruction.remove(block, oldReplicas, repl.readOnlyReplicas(),
             repl.decommissionedAndDecommissioning(), oldExpectedReplicas);
       }
     } finally {
@@ -3885,10 +3893,10 @@ public class BlockManager implements BlockStatsMXBean {
   }
 
   /**
-   * Check replication of the blocks in the collection.
-   * If any block is needed replication, insert it into the replication queue.
+   * Check sufficient redundancy of the blocks in the collection. If any block
+   * is needed reconstruction, insert it into the reconstruction queue.
    * Otherwise, if the block is more than the expected replication factor,
-   * process it as an over replicated block.
+   * process it as an extra redundancy block.
    */
   public void checkReplication(BlockCollection bc) {
     for (BlockInfo block : bc.getBlocks()) {
@@ -3896,11 +3904,11 @@ public class BlockManager implements BlockStatsMXBean {
       final NumberReplicas n = countNodes(block);
       final int pending = pendingReplications.getNumReplicas(block);
       if (!hasEnoughEffectiveReplicas(block, n, pending, expected)) {
-        neededReplications.add(block, n.liveReplicas() + pending,
+        neededReconstruction.add(block, n.liveReplicas() + pending,
             n.readOnlyReplicas(),
             n.decommissionedAndDecommissioning(), expected);
-      } else if (shouldProcessOverReplicated(n, expected)) {
-        processOverReplicatedBlock(block, expected, null, null);
+      } else if (shouldProcessExtraRedundancy(n, expected)) {
+        processExtraRedundancyBlock(block, expected, null, null);
       }
     }
   }
@@ -3926,7 +3934,7 @@ public class BlockManager implements BlockStatsMXBean {
     try {
       // blocks should not be replicated or removed if safe mode is on
       if (namesystem.isInSafeMode()) {
-        LOG.debug("In safemode, not computing replication work");
+        LOG.debug("In safemode, not computing reconstruction work");
         return 0;
       }
       try {
@@ -3980,10 +3988,10 @@ public class BlockManager implements BlockStatsMXBean {
   }
 
   /**
-   * A block needs replication if the number of replicas is less than expected
-   * or if it does not have enough racks.
+   * A block needs reconstruction if the number of replicas is less than
+   * expected or if it does not have enough racks.
    */
-  boolean isNeededReplication(BlockInfo storedBlock, int current) {
+  boolean isNeededReconstruction(BlockInfo storedBlock, int current) {
     int expected = getExpectedReplicaNum(storedBlock);
     return storedBlock.isComplete()
         && (current < expected || !isPlacementPolicySatisfied(storedBlock));
@@ -3997,12 +4005,12 @@ public class BlockManager implements BlockStatsMXBean {
 
   public long getMissingBlocksCount() {
     // not locking
-    return this.neededReplications.getCorruptBlockSize();
+    return this.neededReconstruction.getCorruptBlockSize();
   }
 
   public long getMissingReplOneBlocksCount() {
     // not locking
-    return this.neededReplications.getCorruptReplOneBlockSize();
+    return this.neededReconstruction.getCorruptReplOneBlockSize();
   }
 
   public BlockInfo addBlockCollection(BlockInfo block,
@@ -4050,8 +4058,8 @@ public class BlockManager implements BlockStatsMXBean {
    * Return an iterator over the set of blocks for which there are no replicas.
    */
   public Iterator<BlockInfo> getCorruptReplicaBlockIterator() {
-    return neededReplications.iterator(
-        UnderReplicatedBlocks.QUEUE_WITH_CORRUPT_BLOCKS);
+    return neededReconstruction.iterator(
+        LowRedundancyBlocks.QUEUE_WITH_CORRUPT_BLOCKS);
   }
 
   /**
@@ -4070,7 +4078,7 @@ public class BlockManager implements BlockStatsMXBean {
 
   /** @return the size of UnderReplicatedBlocks */
   public int numOfUnderReplicatedBlocks() {
-    return neededReplications.size();
+    return neededReconstruction.size();
   }
 
   /**
@@ -4232,7 +4240,7 @@ public class BlockManager implements BlockStatsMXBean {
    * this NameNode.
    */
   public void clearQueues() {
-    neededReplications.clear();
+    neededReconstruction.clear();
     pendingReplications.clear();
     excessReplicas.clear();
     invalidateBlocks.clear();
@@ -4298,7 +4306,7 @@ public class BlockManager implements BlockStatsMXBean {
   }
 
   public void shutdown() {
-    stopReplicationInitializer();
+    stopReconstructionInitializer();
     blocksMap.close();
     MBeans.unregister(mxBeanName);
     mxBeanName = null;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/32d043d9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
index 480670a..3b5f103 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
@@ -215,10 +215,10 @@ public class DecommissionManager {
     if (node.isDecommissionInProgress() || node.isDecommissioned()) {
       // Update DN stats maintained by HeartbeatManager
       hbManager.stopDecommission(node);
-      // Over-replicated blocks will be detected and processed when
+      // extra redundancy blocks will be detected and processed when
       // the dead node comes back and send in its full block report.
       if (node.isAlive()) {
-        blockManager.processOverReplicatedBlocksOnReCommission(node);
+        blockManager.processExtraRedundancyBlocksOnReCommission(node);
       }
       // Remove from tracking in DecommissionManager
       pendingNodes.remove(node);
@@ -513,9 +513,9 @@ public class DecommissionManager {
         final List<BlockInfo> insufficientList,
         boolean pruneReliableBlocks) {
       boolean firstReplicationLog = true;
-      int underReplicatedBlocks = 0;
+      int lowRedundancyBlocks = 0;
       int decommissionOnlyReplicas = 0;
-      int underReplicatedInOpenFiles = 0;
+      int lowRedundancyInOpenFiles = 0;
       while (it.hasNext()) {
         numBlocksChecked++;
         final BlockInfo block = it.next();
@@ -537,22 +537,22 @@ public class DecommissionManager {
         final NumberReplicas num = blockManager.countNodes(block);
         final int liveReplicas = num.liveReplicas();
 
-        // Schedule under-replicated blocks for replication if not already
+        // Schedule low redundancy blocks for reconstruction if not already
         // pending
-        if (blockManager.isNeededReplication(block, liveReplicas)) {
-          if (!blockManager.neededReplications.contains(block) &&
+        if (blockManager.isNeededReconstruction(block, liveReplicas)) {
+          if (!blockManager.neededReconstruction.contains(block) &&
               blockManager.pendingReplications.getNumReplicas(block) == 0 &&
               blockManager.isPopulatingReplQueues()) {
             // Process these blocks only when active NN is out of safe mode.
-            blockManager.neededReplications.add(block,
+            blockManager.neededReconstruction.add(block,
                 liveReplicas, num.readOnlyReplicas(),
                 num.decommissionedAndDecommissioning(),
                 blockManager.getExpectedReplicaNum(block));
           }
         }
 
-        // Even if the block is under-replicated, 
-        // it doesn't block decommission if it's sufficiently replicated
+        // Even if the block is without sufficient redundancy,
+        // it doesn't block decommission if has sufficient redundancy
         if (isSufficient(block, bc, num)) {
           if (pruneReliableBlocks) {
             it.remove();
@@ -560,7 +560,7 @@ public class DecommissionManager {
           continue;
         }
 
-        // We've found an insufficiently replicated block.
+        // We've found a block without sufficient redundancy.
         if (insufficientList != null) {
           insufficientList.add(block);
         }
@@ -571,18 +571,18 @@ public class DecommissionManager {
           firstReplicationLog = false;
         }
         // Update various counts
-        underReplicatedBlocks++;
+        lowRedundancyBlocks++;
         if (bc.isUnderConstruction()) {
-          underReplicatedInOpenFiles++;
+          lowRedundancyInOpenFiles++;
         }
         if ((liveReplicas == 0) && (num.decommissionedAndDecommissioning() > 0)) {
           decommissionOnlyReplicas++;
         }
       }
 
-      datanode.decommissioningStatus.set(underReplicatedBlocks,
+      datanode.decommissioningStatus.set(lowRedundancyBlocks,
           decommissionOnlyReplicas,
-          underReplicatedInOpenFiles);
+          lowRedundancyInOpenFiles);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/32d043d9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/LowRedundancyBlocks.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/LowRedundancyBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/LowRedundancyBlocks.java
new file mode 100644
index 0000000..de8cf4e
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/LowRedundancyBlocks.java
@@ -0,0 +1,458 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.util.LightWeightLinkedSet;
+
+/**
+ * Keep prioritized queues of low redundant blocks.
+ * Blocks have redundancy priority, with priority
+ * {@link #QUEUE_HIGHEST_PRIORITY} indicating the highest priority.
+ * </p>
+ * Having a prioritised queue allows the {@link BlockManager} to select
+ * which blocks to replicate first -it tries to give priority to data
+ * that is most at risk or considered most valuable.
+ *
+ * <p/>
+ * The policy for choosing which priority to give added blocks
+ * is implemented in {@link #getPriority(BlockInfo, int, int, int, int)}.
+ * </p>
+ * <p>The queue order is as follows:</p>
+ * <ol>
+ *   <li>{@link #QUEUE_HIGHEST_PRIORITY}: the blocks that should be redundant
+ *   first. That is blocks with only one copy, or blocks with zero live
+ *   copies but a copy in a node being decommissioned. These blocks
+ *   are at risk of loss if the disk or server on which they
+ *   remain fails.</li>
+ *   <li>{@link #QUEUE_VERY_LOW_REDUNDANCY}: blocks that are very
+ *   under-replicated compared to their expected values. Currently
+ *   that means the ratio of the ratio of actual:expected means that
+ *   there is <i>less than</i> 1:3.</li>. These blocks may not be at risk,
+ *   but they are clearly considered "important".
+ *   <li>{@link #QUEUE_LOW_REDUNDANCY}: blocks that are also under
+ *   replicated, and the ratio of actual:expected is good enough that
+ *   they do not need to go into the {@link #QUEUE_VERY_LOW_REDUNDANCY}
+ *   queue.</li>
+ *   <li>{@link #QUEUE_REPLICAS_BADLY_DISTRIBUTED}: there are as least as
+ *   many copies of a block as required, but the blocks are not adequately
+ *   distributed. Loss of a rack/switch could take all copies off-line.</li>
+ *   <li>{@link #QUEUE_WITH_CORRUPT_BLOCKS} This is for blocks that are corrupt
+ *   and for which there are no-non-corrupt copies (currently) available.
+ *   The policy here is to keep those corrupt blocks replicated, but give
+ *   blocks that are not corrupt higher priority.</li>
+ * </ol>
+ */
+class LowRedundancyBlocks implements Iterable<BlockInfo> {
+  /** The total number of queues : {@value} */
+  static final int LEVEL = 5;
+  /** The queue with the highest priority: {@value} */
+  static final int QUEUE_HIGHEST_PRIORITY = 0;
+  /** The queue for blocks that are way below their expected value : {@value} */
+  static final int QUEUE_VERY_LOW_REDUNDANCY = 1;
+  /**
+   * The queue for "normally" without sufficient redundancy blocks : {@value}.
+   */
+  static final int QUEUE_LOW_REDUNDANCY = 2;
+  /** The queue for blocks that have the right number of replicas,
+   * but which the block manager felt were badly distributed: {@value}
+   */
+  static final int QUEUE_REPLICAS_BADLY_DISTRIBUTED = 3;
+  /** The queue for corrupt blocks: {@value} */
+  static final int QUEUE_WITH_CORRUPT_BLOCKS = 4;
+  /** the queues themselves */
+  private final List<LightWeightLinkedSet<BlockInfo>> priorityQueues
+      = new ArrayList<>(LEVEL);
+
+  /** The number of corrupt blocks with replication factor 1 */
+  private int corruptReplOneBlocks = 0;
+
+  /** Create an object. */
+  LowRedundancyBlocks() {
+    for (int i = 0; i < LEVEL; i++) {
+      priorityQueues.add(new LightWeightLinkedSet<BlockInfo>());
+    }
+  }
+
+  /**
+   * Empty the queues.
+   */
+  synchronized void clear() {
+    for (int i = 0; i < LEVEL; i++) {
+      priorityQueues.get(i).clear();
+    }
+    corruptReplOneBlocks = 0;
+  }
+
+  /** Return the total number of insufficient redundancy blocks. */
+  synchronized int size() {
+    int size = 0;
+    for (int i = 0; i < LEVEL; i++) {
+      size += priorityQueues.get(i).size();
+    }
+    return size;
+  }
+
+  /**
+   * Return the number of insufficiently redundant blocks excluding corrupt
+   * blocks.
+   */
+  synchronized int getLowRedundancyBlockCount() {
+    int size = 0;
+    for (int i = 0; i < LEVEL; i++) {
+      if (i != QUEUE_WITH_CORRUPT_BLOCKS) {
+        size += priorityQueues.get(i).size();
+      }
+    }
+    return size;
+  }
+
+  /** Return the number of corrupt blocks */
+  synchronized int getCorruptBlockSize() {
+    return priorityQueues.get(QUEUE_WITH_CORRUPT_BLOCKS).size();
+  }
+
+  /** Return the number of corrupt blocks with replication factor 1 */
+  synchronized int getCorruptReplOneBlockSize() {
+    return corruptReplOneBlocks;
+  }
+
+  /** Check if a block is in the neededReconstruction queue. */
+  synchronized boolean contains(BlockInfo block) {
+    for(LightWeightLinkedSet<BlockInfo> set : priorityQueues) {
+      if (set.contains(block)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /** Return the priority of a block
+   * @param curReplicas current number of replicas of the block
+   * @param expectedReplicas expected number of replicas of the block
+   * @return the priority for the blocks, between 0 and ({@link #LEVEL}-1)
+   */
+  private int getPriority(BlockInfo block,
+                          int curReplicas,
+                          int readOnlyReplicas,
+                          int decommissionedReplicas,
+                          int expectedReplicas) {
+    assert curReplicas >= 0 : "Negative replicas!";
+    if (curReplicas >= expectedReplicas) {
+      // Block has enough copies, but not enough racks
+      return QUEUE_REPLICAS_BADLY_DISTRIBUTED;
+    }
+    if (block.isStriped()) {
+      BlockInfoStriped sblk = (BlockInfoStriped) block;
+      return getPriorityStriped(curReplicas, decommissionedReplicas,
+          sblk.getRealDataBlockNum(), sblk.getParityBlockNum());
+    } else {
+      return getPriorityContiguous(curReplicas, readOnlyReplicas,
+          decommissionedReplicas, expectedReplicas);
+    }
+  }
+
+  private int getPriorityContiguous(int curReplicas, int readOnlyReplicas,
+      int decommissionedReplicas, int expectedReplicas) {
+    if (curReplicas == 0) {
+      // If there are zero non-decommissioned replicas but there are
+      // some decommissioned replicas, then assign them highest priority
+      if (decommissionedReplicas > 0) {
+        return QUEUE_HIGHEST_PRIORITY;
+      }
+      if (readOnlyReplicas > 0) {
+        // only has read-only replicas, highest risk
+        // since the read-only replicas may go down all together.
+        return QUEUE_HIGHEST_PRIORITY;
+      }
+      //all we have are corrupt blocks
+      return QUEUE_WITH_CORRUPT_BLOCKS;
+    } else if (curReplicas == 1) {
+      // only one replica, highest risk of loss
+      // highest priority
+      return QUEUE_HIGHEST_PRIORITY;
+    } else if ((curReplicas * 3) < expectedReplicas) {
+      //can only afford one replica loss
+      //this is considered very insufficiently redundant blocks.
+      return QUEUE_VERY_LOW_REDUNDANCY;
+    } else {
+      //add to the normal queue for insufficiently redundant blocks
+      return QUEUE_LOW_REDUNDANCY;
+    }
+  }
+
+  private int getPriorityStriped(int curReplicas, int decommissionedReplicas,
+      short dataBlkNum, short parityBlkNum) {
+    if (curReplicas < dataBlkNum) {
+      // There are some replicas on decommissioned nodes so it's not corrupted
+      if (curReplicas + decommissionedReplicas >= dataBlkNum) {
+        return QUEUE_HIGHEST_PRIORITY;
+      }
+      return QUEUE_WITH_CORRUPT_BLOCKS;
+    } else if (curReplicas == dataBlkNum) {
+      // highest risk of loss, highest priority
+      return QUEUE_HIGHEST_PRIORITY;
+    } else if ((curReplicas - dataBlkNum) * 3 < parityBlkNum + 1) {
+      // can only afford one replica loss
+      // this is considered very insufficiently redundant blocks.
+      return QUEUE_VERY_LOW_REDUNDANCY;
+    } else {
+      // add to the normal queue for insufficiently redundant blocks.
+      return QUEUE_LOW_REDUNDANCY;
+    }
+  }
+
+  /**
+   * Add a block to insufficiently redundant queue according to its priority.
+   *
+   * @param block a low redundancy block
+   * @param curReplicas current number of replicas of the block
+   * @param decomissionedReplicas the number of decommissioned replicas
+   * @param expectedReplicas expected number of replicas of the block
+   * @return true if the block was added to a queue.
+   */
+  synchronized boolean add(BlockInfo block,
+                           int curReplicas,
+                           int readOnlyReplicas,
+                           int decomissionedReplicas,
+                           int expectedReplicas) {
+    assert curReplicas >= 0 : "Negative replicas!";
+    final int priLevel = getPriority(block, curReplicas, readOnlyReplicas,
+        decomissionedReplicas, expectedReplicas);
+    if(priorityQueues.get(priLevel).add(block)) {
+      if (priLevel == QUEUE_WITH_CORRUPT_BLOCKS &&
+          expectedReplicas == 1) {
+        corruptReplOneBlocks++;
+      }
+      NameNode.blockStateChangeLog.debug(
+          "BLOCK* NameSystem.LowRedundancyBlock.add: {}"
+              + " has only {} replicas and need {} replicas so is added to"
+              + " neededReconstructions at priority level {}",
+          block, curReplicas, expectedReplicas, priLevel);
+
+      return true;
+    }
+    return false;
+  }
+
+  /** Remove a block from a low redundancy queue. */
+  synchronized boolean remove(BlockInfo block,
+                              int oldReplicas,
+                              int oldReadOnlyReplicas,
+                              int decommissionedReplicas,
+                              int oldExpectedReplicas) {
+    final int priLevel = getPriority(block, oldReplicas, oldReadOnlyReplicas,
+        decommissionedReplicas, oldExpectedReplicas);
+    boolean removedBlock = remove(block, priLevel);
+    if (priLevel == QUEUE_WITH_CORRUPT_BLOCKS &&
+        oldExpectedReplicas == 1 &&
+        removedBlock) {
+      corruptReplOneBlocks--;
+      assert corruptReplOneBlocks >= 0 :
+          "Number of corrupt blocks with replication factor 1 " +
+              "should be non-negative";
+    }
+    return removedBlock;
+  }
+
+  /**
+   * Remove a block from the low redundancy queues.
+   *
+   * The priLevel parameter is a hint of which queue to query
+   * first: if negative or &gt;= {@link #LEVEL} this shortcutting
+   * is not attmpted.
+   *
+   * If the block is not found in the nominated queue, an attempt is made to
+   * remove it from all queues.
+   *
+   * <i>Warning:</i> This is not a synchronized method.
+   * @param block block to remove
+   * @param priLevel expected privilege level
+   * @return true if the block was found and removed from one of the priority
+   *         queues
+   */
+  boolean remove(BlockInfo block, int priLevel) {
+    if(priLevel >= 0 && priLevel < LEVEL
+        && priorityQueues.get(priLevel).remove(block)) {
+      NameNode.blockStateChangeLog.debug(
+          "BLOCK* NameSystem.LowRedundancyBlock.remove: Removing block {}"
+              + " from priority queue {}",
+          block, priLevel);
+      return true;
+    } else {
+      // Try to remove the block from all queues if the block was
+      // not found in the queue for the given priority level.
+      for (int i = 0; i < LEVEL; i++) {
+        if (i != priLevel && priorityQueues.get(i).remove(block)) {
+          NameNode.blockStateChangeLog.debug(
+              "BLOCK* NameSystem.LowRedundancyBlock.remove: Removing block" +
+                  " {} from priority queue {}", block, i);
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Recalculate and potentially update the priority level of a block.
+   *
+   * If the block priority has changed from before an attempt is made to
+   * remove it from the block queue. Regardless of whether or not the block
+   * is in the block queue of (recalculate) priority, an attempt is made
+   * to add it to that queue. This ensures that the block will be
+   * in its expected priority queue (and only that queue) by the end of the
+   * method call.
+   * @param block a low redundancy block
+   * @param curReplicas current number of replicas of the block
+   * @param decommissionedReplicas  the number of decommissioned replicas
+   * @param curExpectedReplicas expected number of replicas of the block
+   * @param curReplicasDelta the change in the replicate count from before
+   * @param expectedReplicasDelta the change in the expected replica count
+   *        from before
+   */
+  synchronized void update(BlockInfo block, int curReplicas,
+                           int readOnlyReplicas, int decommissionedReplicas,
+                           int curExpectedReplicas,
+                           int curReplicasDelta, int expectedReplicasDelta) {
+    int oldReplicas = curReplicas-curReplicasDelta;
+    int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta;
+    int curPri = getPriority(block, curReplicas, readOnlyReplicas,
+        decommissionedReplicas, curExpectedReplicas);
+    int oldPri = getPriority(block, oldReplicas, readOnlyReplicas,
+        decommissionedReplicas, oldExpectedReplicas);
+    if(NameNode.stateChangeLog.isDebugEnabled()) {
+      NameNode.stateChangeLog.debug("LowRedundancyBlocks.update " +
+        block +
+        " curReplicas " + curReplicas +
+        " curExpectedReplicas " + curExpectedReplicas +
+        " oldReplicas " + oldReplicas +
+        " oldExpectedReplicas  " + oldExpectedReplicas +
+        " curPri  " + curPri +
+        " oldPri  " + oldPri);
+    }
+    if(oldPri != curPri) {
+      remove(block, oldPri);
+    }
+    if(priorityQueues.get(curPri).add(block)) {
+      NameNode.blockStateChangeLog.debug(
+          "BLOCK* NameSystem.LowRedundancyBlock.update: {} has only {} "
+              + "replicas and needs {} replicas so is added to "
+              + "neededReconstructions at priority level {}",
+          block, curReplicas, curExpectedReplicas, curPri);
+
+    }
+    if (oldPri != curPri || expectedReplicasDelta != 0) {
+      // corruptReplOneBlocks could possibly change
+      if (curPri == QUEUE_WITH_CORRUPT_BLOCKS &&
+          curExpectedReplicas == 1) {
+        // add a new corrupt block with replication factor 1
+        corruptReplOneBlocks++;
+      } else if (oldPri == QUEUE_WITH_CORRUPT_BLOCKS &&
+          curExpectedReplicas - expectedReplicasDelta == 1) {
+        // remove an existing corrupt block with replication factor 1
+        corruptReplOneBlocks--;
+      }
+    }
+  }
+
+  /**
+   * Get a list of block lists without sufficient redundancy. The index of
+   * block lists represents its replication priority. Iterates each block list
+   * in priority order beginning with the highest priority list. Iterators use
+   * a bookmark to resume where the previous iteration stopped. Returns when
+   * the block count is met or iteration reaches the end of the lowest priority
+   * list, in which case bookmarks for each block list are reset to the heads
+   * of their respective lists.
+   *
+   * @param blocksToProcess - number of blocks to fetch from low redundancy
+   *          blocks.
+   * @return Return a list of block lists to be replicated. The block list
+   *         index represents its redundancy priority.
+   */
+  synchronized List<List<BlockInfo>> chooseLowRedundancyBlocks(
+      int blocksToProcess) {
+    final List<List<BlockInfo>> blocksToReconstruct = new ArrayList<>(LEVEL);
+
+    int count = 0;
+    int priority = 0;
+    for (; count < blocksToProcess && priority < LEVEL; priority++) {
+      if (priority == QUEUE_WITH_CORRUPT_BLOCKS) {
+        // do not choose corrupted blocks.
+        continue;
+      }
+
+      // Go through all blocks that need reconstructions with current priority.
+      // Set the iterator to the first unprocessed block at this priority level
+      final Iterator<BlockInfo> i = priorityQueues.get(priority).getBookmark();
+      final List<BlockInfo> blocks = new LinkedList<>();
+      blocksToReconstruct.add(blocks);
+      // Loop through all remaining blocks in the list.
+      for(; count < blocksToProcess && i.hasNext(); count++) {
+        blocks.add(i.next());
+      }
+    }
+
+    if (priority == LEVEL) {
+      // Reset all bookmarks because there were no recently added blocks.
+      for (LightWeightLinkedSet<BlockInfo> q : priorityQueues) {
+        q.resetBookmark();
+      }
+    }
+
+    return blocksToReconstruct;
+  }
+
+  /** Returns an iterator of all blocks in a given priority queue. */
+  synchronized Iterator<BlockInfo> iterator(int level) {
+    return priorityQueues.get(level).iterator();
+  }
+
+  /** Return an iterator of all the low redundancy blocks. */
+  @Override
+  public synchronized Iterator<BlockInfo> iterator() {
+    final Iterator<LightWeightLinkedSet<BlockInfo>> q = priorityQueues.iterator();
+    return new Iterator<BlockInfo>() {
+      private Iterator<BlockInfo> b = q.next().iterator();
+
+      @Override
+      public BlockInfo next() {
+        hasNext();
+        return b.next();
+      }
+
+      @Override
+      public boolean hasNext() {
+        for(; !b.hasNext() && q.hasNext(); ) {
+          b = q.next().iterator();
+        }
+        return b.hasNext();
+      }
+
+      @Override
+      public void remove() {
+        throw new UnsupportedOperationException();
+      }
+    };
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/32d043d9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java
deleted file mode 100644
index 5e8f7ed..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java
+++ /dev/null
@@ -1,448 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.blockmanagement;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.hadoop.hdfs.server.namenode.NameNode;
-import org.apache.hadoop.hdfs.util.LightWeightLinkedSet;
-
-/**
- * Keep prioritized queues of under replicated blocks.
- * Blocks have replication priority, with priority {@link #QUEUE_HIGHEST_PRIORITY}
- * indicating the highest priority.
- * </p>
- * Having a prioritised queue allows the {@link BlockManager} to select
- * which blocks to replicate first -it tries to give priority to data
- * that is most at risk or considered most valuable.
- *
- * <p/>
- * The policy for choosing which priority to give added blocks
- * is implemented in {@link #getPriority(BlockInfo, int, int, int, int)}.
- * </p>
- * <p>The queue order is as follows:</p>
- * <ol>
- *   <li>{@link #QUEUE_HIGHEST_PRIORITY}: the blocks that must be replicated
- *   first. That is blocks with only one copy, or blocks with zero live
- *   copies but a copy in a node being decommissioned. These blocks
- *   are at risk of loss if the disk or server on which they
- *   remain fails.</li>
- *   <li>{@link #QUEUE_VERY_UNDER_REPLICATED}: blocks that are very
- *   under-replicated compared to their expected values. Currently
- *   that means the ratio of the ratio of actual:expected means that
- *   there is <i>less than</i> 1:3.</li>. These blocks may not be at risk,
- *   but they are clearly considered "important".
- *   <li>{@link #QUEUE_UNDER_REPLICATED}: blocks that are also under
- *   replicated, and the ratio of actual:expected is good enough that
- *   they do not need to go into the {@link #QUEUE_VERY_UNDER_REPLICATED}
- *   queue.</li>
- *   <li>{@link #QUEUE_REPLICAS_BADLY_DISTRIBUTED}: there are as least as
- *   many copies of a block as required, but the blocks are not adequately
- *   distributed. Loss of a rack/switch could take all copies off-line.</li>
- *   <li>{@link #QUEUE_WITH_CORRUPT_BLOCKS} This is for blocks that are corrupt
- *   and for which there are no-non-corrupt copies (currently) available.
- *   The policy here is to keep those corrupt blocks replicated, but give
- *   blocks that are not corrupt higher priority.</li>
- * </ol>
- */
-class UnderReplicatedBlocks implements Iterable<BlockInfo> {
-  /** The total number of queues : {@value} */
-  static final int LEVEL = 5;
-  /** The queue with the highest priority: {@value} */
-  static final int QUEUE_HIGHEST_PRIORITY = 0;
-  /** The queue for blocks that are way below their expected value : {@value} */
-  static final int QUEUE_VERY_UNDER_REPLICATED = 1;
-  /** The queue for "normally" under-replicated blocks: {@value} */
-  static final int QUEUE_UNDER_REPLICATED = 2;
-  /** The queue for blocks that have the right number of replicas,
-   * but which the block manager felt were badly distributed: {@value}
-   */
-  static final int QUEUE_REPLICAS_BADLY_DISTRIBUTED = 3;
-  /** The queue for corrupt blocks: {@value} */
-  static final int QUEUE_WITH_CORRUPT_BLOCKS = 4;
-  /** the queues themselves */
-  private final List<LightWeightLinkedSet<BlockInfo>> priorityQueues
-      = new ArrayList<>(LEVEL);
-
-  /** The number of corrupt blocks with replication factor 1 */
-  private int corruptReplOneBlocks = 0;
-
-  /** Create an object. */
-  UnderReplicatedBlocks() {
-    for (int i = 0; i < LEVEL; i++) {
-      priorityQueues.add(new LightWeightLinkedSet<BlockInfo>());
-    }
-  }
-
-  /**
-   * Empty the queues.
-   */
-  synchronized void clear() {
-    for (int i = 0; i < LEVEL; i++) {
-      priorityQueues.get(i).clear();
-    }
-    corruptReplOneBlocks = 0;
-  }
-
-  /** Return the total number of under replication blocks */
-  synchronized int size() {
-    int size = 0;
-    for (int i = 0; i < LEVEL; i++) {
-      size += priorityQueues.get(i).size();
-    }
-    return size;
-  }
-
-  /** Return the number of under replication blocks excluding corrupt blocks */
-  synchronized int getUnderReplicatedBlockCount() {
-    int size = 0;
-    for (int i = 0; i < LEVEL; i++) {
-      if (i != QUEUE_WITH_CORRUPT_BLOCKS) {
-        size += priorityQueues.get(i).size();
-      }
-    }
-    return size;
-  }
-
-  /** Return the number of corrupt blocks */
-  synchronized int getCorruptBlockSize() {
-    return priorityQueues.get(QUEUE_WITH_CORRUPT_BLOCKS).size();
-  }
-
-  /** Return the number of corrupt blocks with replication factor 1 */
-  synchronized int getCorruptReplOneBlockSize() {
-    return corruptReplOneBlocks;
-  }
-
-  /** Check if a block is in the neededReplication queue */
-  synchronized boolean contains(BlockInfo block) {
-    for(LightWeightLinkedSet<BlockInfo> set : priorityQueues) {
-      if (set.contains(block)) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  /** Return the priority of a block
-   * @param curReplicas current number of replicas of the block
-   * @param expectedReplicas expected number of replicas of the block
-   * @return the priority for the blocks, between 0 and ({@link #LEVEL}-1)
-   */
-  private int getPriority(BlockInfo block,
-                          int curReplicas,
-                          int readOnlyReplicas,
-                          int decommissionedReplicas,
-                          int expectedReplicas) {
-    assert curReplicas >= 0 : "Negative replicas!";
-    if (curReplicas >= expectedReplicas) {
-      // Block has enough copies, but not enough racks
-      return QUEUE_REPLICAS_BADLY_DISTRIBUTED;
-    }
-    if (block.isStriped()) {
-      BlockInfoStriped sblk = (BlockInfoStriped) block;
-      return getPriorityStriped(curReplicas, decommissionedReplicas,
-          sblk.getRealDataBlockNum(), sblk.getParityBlockNum());
-    } else {
-      return getPriorityContiguous(curReplicas, readOnlyReplicas,
-          decommissionedReplicas, expectedReplicas);
-    }
-  }
-
-  private int getPriorityContiguous(int curReplicas, int readOnlyReplicas,
-      int decommissionedReplicas, int expectedReplicas) {
-    if (curReplicas == 0) {
-      // If there are zero non-decommissioned replicas but there are
-      // some decommissioned replicas, then assign them highest priority
-      if (decommissionedReplicas > 0) {
-        return QUEUE_HIGHEST_PRIORITY;
-      }
-      if (readOnlyReplicas > 0) {
-        // only has read-only replicas, highest risk
-        // since the read-only replicas may go down all together.
-        return QUEUE_HIGHEST_PRIORITY;
-      }
-      //all we have are corrupt blocks
-      return QUEUE_WITH_CORRUPT_BLOCKS;
-    } else if (curReplicas == 1) {
-      // only one replica, highest risk of loss
-      // highest priority
-      return QUEUE_HIGHEST_PRIORITY;
-    } else if ((curReplicas * 3) < expectedReplicas) {
-      //there is less than a third as many blocks as requested;
-      //this is considered very under-replicated
-      return QUEUE_VERY_UNDER_REPLICATED;
-    } else {
-      //add to the normal queue for under replicated blocks
-      return QUEUE_UNDER_REPLICATED;
-    }
-  }
-
-  private int getPriorityStriped(int curReplicas, int decommissionedReplicas,
-      short dataBlkNum, short parityBlkNum) {
-    if (curReplicas < dataBlkNum) {
-      // There are some replicas on decommissioned nodes so it's not corrupted
-      if (curReplicas + decommissionedReplicas >= dataBlkNum) {
-        return QUEUE_HIGHEST_PRIORITY;
-      }
-      return QUEUE_WITH_CORRUPT_BLOCKS;
-    } else if (curReplicas == dataBlkNum) {
-      // highest risk of loss, highest priority
-      return QUEUE_HIGHEST_PRIORITY;
-    } else if ((curReplicas - dataBlkNum) * 3 < parityBlkNum + 1) {
-      // there is less than a third as many blocks as requested;
-      // this is considered very under-replicated
-      return QUEUE_VERY_UNDER_REPLICATED;
-    } else {
-      // add to the normal queue for under replicated blocks
-      return QUEUE_UNDER_REPLICATED;
-    }
-  }
-
-  /** add a block to a under replication queue according to its priority
-   * @param block a under replication block
-   * @param curReplicas current number of replicas of the block
-   * @param decomissionedReplicas the number of decommissioned replicas
-   * @param expectedReplicas expected number of replicas of the block
-   * @return true if the block was added to a queue.
-   */
-  synchronized boolean add(BlockInfo block,
-                           int curReplicas,
-                           int readOnlyReplicas,
-                           int decomissionedReplicas,
-                           int expectedReplicas) {
-    assert curReplicas >= 0 : "Negative replicas!";
-    final int priLevel = getPriority(block, curReplicas, readOnlyReplicas,
-        decomissionedReplicas, expectedReplicas);
-    if(priorityQueues.get(priLevel).add(block)) {
-      if (priLevel == QUEUE_WITH_CORRUPT_BLOCKS &&
-          expectedReplicas == 1) {
-        corruptReplOneBlocks++;
-      }
-      NameNode.blockStateChangeLog.debug(
-          "BLOCK* NameSystem.UnderReplicationBlock.add: {}"
-              + " has only {} replicas and need {} replicas so is added to" +
-              " neededReplications at priority level {}", block, curReplicas,
-          expectedReplicas, priLevel);
-
-      return true;
-    }
-    return false;
-  }
-
-  /** remove a block from a under replication queue */
-  synchronized boolean remove(BlockInfo block,
-                              int oldReplicas,
-                              int oldReadOnlyReplicas,
-                              int decommissionedReplicas,
-                              int oldExpectedReplicas) {
-    final int priLevel = getPriority(block, oldReplicas, oldReadOnlyReplicas,
-        decommissionedReplicas, oldExpectedReplicas);
-    boolean removedBlock = remove(block, priLevel);
-    if (priLevel == QUEUE_WITH_CORRUPT_BLOCKS &&
-        oldExpectedReplicas == 1 &&
-        removedBlock) {
-      corruptReplOneBlocks--;
-      assert corruptReplOneBlocks >= 0 :
-          "Number of corrupt blocks with replication factor 1 " +
-              "should be non-negative";
-    }
-    return removedBlock;
-  }
-
-  /**
-   * Remove a block from the under replication queues.
-   *
-   * The priLevel parameter is a hint of which queue to query
-   * first: if negative or &gt;= {@link #LEVEL} this shortcutting
-   * is not attmpted.
-   *
-   * If the block is not found in the nominated queue, an attempt is made to
-   * remove it from all queues.
-   *
-   * <i>Warning:</i> This is not a synchronized method.
-   * @param block block to remove
-   * @param priLevel expected privilege level
-   * @return true if the block was found and removed from one of the priority queues
-   */
-  boolean remove(BlockInfo block, int priLevel) {
-    if(priLevel >= 0 && priLevel < LEVEL
-        && priorityQueues.get(priLevel).remove(block)) {
-      NameNode.blockStateChangeLog.debug(
-        "BLOCK* NameSystem.UnderReplicationBlock.remove: Removing block {}" +
-            " from priority queue {}", block, priLevel);
-      return true;
-    } else {
-      // Try to remove the block from all queues if the block was
-      // not found in the queue for the given priority level.
-      for (int i = 0; i < LEVEL; i++) {
-        if (i != priLevel && priorityQueues.get(i).remove(block)) {
-          NameNode.blockStateChangeLog.debug(
-              "BLOCK* NameSystem.UnderReplicationBlock.remove: Removing block" +
-                  " {} from priority queue {}", block, i);
-          return true;
-        }
-      }
-    }
-    return false;
-  }
-
-  /**
-   * Recalculate and potentially update the priority level of a block.
-   *
-   * If the block priority has changed from before an attempt is made to
-   * remove it from the block queue. Regardless of whether or not the block
-   * is in the block queue of (recalculate) priority, an attempt is made
-   * to add it to that queue. This ensures that the block will be
-   * in its expected priority queue (and only that queue) by the end of the
-   * method call.
-   * @param block a under replicated block
-   * @param curReplicas current number of replicas of the block
-   * @param decommissionedReplicas  the number of decommissioned replicas
-   * @param curExpectedReplicas expected number of replicas of the block
-   * @param curReplicasDelta the change in the replicate count from before
-   * @param expectedReplicasDelta the change in the expected replica count from before
-   */
-  synchronized void update(BlockInfo block, int curReplicas,
-                           int readOnlyReplicas, int decommissionedReplicas,
-                           int curExpectedReplicas,
-                           int curReplicasDelta, int expectedReplicasDelta) {
-    int oldReplicas = curReplicas-curReplicasDelta;
-    int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta;
-    int curPri = getPriority(block, curReplicas, readOnlyReplicas,
-        decommissionedReplicas, curExpectedReplicas);
-    int oldPri = getPriority(block, oldReplicas, readOnlyReplicas,
-        decommissionedReplicas, oldExpectedReplicas);
-    if(NameNode.stateChangeLog.isDebugEnabled()) {
-      NameNode.stateChangeLog.debug("UnderReplicationBlocks.update " + 
-        block +
-        " curReplicas " + curReplicas +
-        " curExpectedReplicas " + curExpectedReplicas +
-        " oldReplicas " + oldReplicas +
-        " oldExpectedReplicas  " + oldExpectedReplicas +
-        " curPri  " + curPri +
-        " oldPri  " + oldPri);
-    }
-    if(oldPri != curPri) {
-      remove(block, oldPri);
-    }
-    if(priorityQueues.get(curPri).add(block)) {
-      NameNode.blockStateChangeLog.debug(
-          "BLOCK* NameSystem.UnderReplicationBlock.update: {} has only {} " +
-              "replicas and needs {} replicas so is added to " +
-              "neededReplications at priority level {}", block, curReplicas,
-          curExpectedReplicas, curPri);
-
-    }
-    if (oldPri != curPri || expectedReplicasDelta != 0) {
-      // corruptReplOneBlocks could possibly change
-      if (curPri == QUEUE_WITH_CORRUPT_BLOCKS &&
-          curExpectedReplicas == 1) {
-        // add a new corrupt block with replication factor 1
-        corruptReplOneBlocks++;
-      } else if (oldPri == QUEUE_WITH_CORRUPT_BLOCKS &&
-          curExpectedReplicas - expectedReplicasDelta == 1) {
-        // remove an existing corrupt block with replication factor 1
-        corruptReplOneBlocks--;
-      }
-    }
-  }
-  
-  /**
-   * Get a list of block lists to be replicated. The index of block lists
-   * represents its replication priority. Iterates each block list in priority
-   * order beginning with the highest priority list. Iterators use a bookmark to
-   * resume where the previous iteration stopped. Returns when the block count
-   * is met or iteration reaches the end of the lowest priority list, in which
-   * case bookmarks for each block list are reset to the heads of their
-   * respective lists.
-   *
-   * @param blocksToProcess - number of blocks to fetch from underReplicated
-   *          blocks.
-   * @return Return a list of block lists to be replicated. The block list index
-   *         represents its replication priority.
-   */
-  synchronized List<List<BlockInfo>> chooseUnderReplicatedBlocks(
-      int blocksToProcess) {
-    final List<List<BlockInfo>> blocksToReplicate = new ArrayList<>(LEVEL);
-    
-    int count = 0;
-    int priority = 0;
-    for (; count < blocksToProcess && priority < LEVEL; priority++) {
-      if (priority == QUEUE_WITH_CORRUPT_BLOCKS) {
-        // do not choose corrupted blocks.
-        continue;
-      }
-
-      // Go through all blocks that need replications with current priority.
-      // Set the iterator to the first unprocessed block at this priority level.
-      final Iterator<BlockInfo> i = priorityQueues.get(priority).getBookmark();
-      final List<BlockInfo> blocks = new LinkedList<>();
-      blocksToReplicate.add(blocks);
-      // Loop through all remaining blocks in the list.
-      for(; count < blocksToProcess && i.hasNext(); count++) {
-        blocks.add(i.next());
-      }
-    }
-
-    if (priority == LEVEL) {
-      // Reset all bookmarks because there were no recently added blocks.
-      for (LightWeightLinkedSet<BlockInfo> q : priorityQueues) {
-        q.resetBookmark();
-      }
-    }
-
-    return blocksToReplicate;
-  }
-
-  /** returns an iterator of all blocks in a given priority queue */
-  synchronized Iterator<BlockInfo> iterator(int level) {
-    return priorityQueues.get(level).iterator();
-  }
-
-  /** return an iterator of all the under replication blocks */
-  @Override
-  public synchronized Iterator<BlockInfo> iterator() {
-    final Iterator<LightWeightLinkedSet<BlockInfo>> q = priorityQueues.iterator();
-    return new Iterator<BlockInfo>() {
-      private Iterator<BlockInfo> b = q.next().iterator();
-
-      @Override
-      public BlockInfo next() {
-        hasNext();
-        return b.next();
-      }
-
-      @Override
-      public boolean hasNext() {
-        for(; !b.hasNext() && q.hasNext(); ) {
-          b = q.next().iterator();
-        }
-        return b.hasNext();
-      }
-
-      @Override
-      public void remove() {
-        throw new UnsupportedOperationException();
-      }
-    };
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/32d043d9/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
index c0a4fdb..1b565ee 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
@@ -74,7 +74,7 @@ public class BlockManagerTestUtil {
       final BlockInfo storedBlock = bm.getStoredBlock(b);
       return new int[]{getNumberOfRacks(bm, b),
           bm.countNodes(storedBlock).liveReplicas(),
-          bm.neededReplications.contains(storedBlock) ? 1 : 0};
+          bm.neededReconstruction.contains(storedBlock) ? 1 : 0};
     } finally {
       namesystem.readUnlock();
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/32d043d9/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
index 5511b99..3a974e4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
@@ -397,20 +397,20 @@ public class TestBlockManager {
     addNodes(nodes);
     List<DatanodeDescriptor> origNodes = nodes.subList(0, 3);
     for (int i = 0; i < NUM_TEST_ITERS; i++) {
-      doTestSingleRackClusterIsSufficientlyReplicated(i, origNodes);
+      doTestSingleRackClusterHasSufficientRedundancy(i, origNodes);
     }
   }
   
-  private void doTestSingleRackClusterIsSufficientlyReplicated(int testIndex,
+  private void doTestSingleRackClusterHasSufficientRedundancy(int testIndex,
       List<DatanodeDescriptor> origNodes)
       throws Exception {
     assertEquals(0, bm.numOfUnderReplicatedBlocks());
     BlockInfo block = addBlockOnNodes(testIndex, origNodes);
-    assertFalse(bm.isNeededReplication(block, bm.countLiveNodes(block)));
+    assertFalse(bm.isNeededReconstruction(block, bm.countLiveNodes(block)));
   }
   
   @Test(timeout = 60000)
-  public void testNeededReplicationWhileAppending() throws IOException {
+  public void testNeededReconstructionWhileAppending() throws IOException {
     Configuration conf = new HdfsConfiguration();
     String src = "/test-file";
     Path file = new Path(src);
@@ -449,7 +449,7 @@ public class TestBlockManager {
         namenode.updatePipeline(clientName, oldBlock, newBlock,
             newLocatedBlock.getLocations(), newLocatedBlock.getStorageIDs());
         BlockInfo bi = bm.getStoredBlock(newBlock.getLocalBlock());
-        assertFalse(bm.isNeededReplication(bi, bm.countLiveNodes(bi)));
+        assertFalse(bm.isNeededReconstruction(bi, bm.countLiveNodes(bi)));
       } finally {
         IOUtils.closeStream(out);
       }
@@ -601,7 +601,7 @@ public class TestBlockManager {
             liveNodes,
             new NumberReplicas(),
             new ArrayList<Byte>(),
-            UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY)[0]);
+            LowRedundancyBlocks.QUEUE_HIGHEST_PRIORITY)[0]);
 
     assertEquals("Does not choose a source node for a less-than-highest-priority"
             + " replication since all available source nodes have reached"
@@ -612,7 +612,7 @@ public class TestBlockManager {
             liveNodes,
             new NumberReplicas(),
             new ArrayList<Byte>(),
-            UnderReplicatedBlocks.QUEUE_VERY_UNDER_REPLICATED).length);
+            LowRedundancyBlocks.QUEUE_VERY_LOW_REDUNDANCY).length);
 
     // Increase the replication count to test replication count > hard limit
     DatanodeStorageInfo targets[] = { origNodes.get(1).getStorageInfos()[0] };
@@ -626,7 +626,7 @@ public class TestBlockManager {
             liveNodes,
             new NumberReplicas(),
             new ArrayList<Byte>(),
-            UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY).length);
+            LowRedundancyBlocks.QUEUE_HIGHEST_PRIORITY).length);
   }
 
   @Test
@@ -652,7 +652,7 @@ public class TestBlockManager {
             cntNodes,
             liveNodes,
             new NumberReplicas(), new LinkedList<Byte>(),
-            UnderReplicatedBlocks.QUEUE_UNDER_REPLICATED)[0]);
+            LowRedundancyBlocks.QUEUE_LOW_REDUNDANCY)[0]);
 
 
     // Increase the replication count to test replication count > hard limit
@@ -666,7 +666,7 @@ public class TestBlockManager {
             cntNodes,
             liveNodes,
             new NumberReplicas(), new LinkedList<Byte>(),
-            UnderReplicatedBlocks.QUEUE_UNDER_REPLICATED).length);
+            LowRedundancyBlocks.QUEUE_LOW_REDUNDANCY).length);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/hadoop/blob/32d043d9/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestLowRedundancyBlockQueues.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestLowRedundancyBlockQueues.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestLowRedundancyBlockQueues.java
new file mode 100644
index 0000000..2eb7abf
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestLowRedundancyBlockQueues.java
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import java.util.Iterator;
+
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.namenode.ErasureCodingPolicyManager;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
+
+public class TestLowRedundancyBlockQueues {
+
+  private final ErasureCodingPolicy ecPolicy =
+      ErasureCodingPolicyManager.getSystemDefaultPolicy();
+
+  private BlockInfo genBlockInfo(long id) {
+    return new BlockInfoContiguous(new Block(id), (short) 3);
+  }
+
+  private BlockInfo genStripedBlockInfo(long id, long numBytes) {
+    BlockInfoStriped sblk =  new BlockInfoStriped(new Block(id), ecPolicy);
+    sblk.setNumBytes(numBytes);
+    return sblk;
+  }
+
+  /**
+   * Test that adding blocks with different replication counts puts them
+   * into different queues
+   * @throws Throwable if something goes wrong
+   */
+  @Test
+  public void testBlockPriorities() throws Throwable {
+    LowRedundancyBlocks queues = new LowRedundancyBlocks();
+    BlockInfo block1 = genBlockInfo(1);
+    BlockInfo block2 = genBlockInfo(2);
+    BlockInfo block_very_low_redundancy = genBlockInfo(3);
+    BlockInfo block_corrupt = genBlockInfo(4);
+    BlockInfo block_corrupt_repl_one = genBlockInfo(5);
+
+    //add a block with a single entry
+    assertAdded(queues, block1, 1, 0, 3);
+
+    assertEquals(1, queues.getLowRedundancyBlockCount());
+    assertEquals(1, queues.size());
+    assertInLevel(queues, block1, LowRedundancyBlocks.QUEUE_HIGHEST_PRIORITY);
+    //repeated additions fail
+    assertFalse(queues.add(block1, 1, 0, 0, 3));
+
+    //add a second block with two replicas
+    assertAdded(queues, block2, 2, 0, 3);
+    assertEquals(2, queues.getLowRedundancyBlockCount());
+    assertEquals(2, queues.size());
+    assertInLevel(queues, block2, LowRedundancyBlocks.QUEUE_LOW_REDUNDANCY);
+    //now try to add a block that is corrupt
+    assertAdded(queues, block_corrupt, 0, 0, 3);
+    assertEquals(3, queues.size());
+    assertEquals(2, queues.getLowRedundancyBlockCount());
+    assertEquals(1, queues.getCorruptBlockSize());
+    assertInLevel(queues, block_corrupt,
+                  LowRedundancyBlocks.QUEUE_WITH_CORRUPT_BLOCKS);
+
+    //insert a very insufficiently redundancy block
+    assertAdded(queues, block_very_low_redundancy, 4, 0, 25);
+    assertInLevel(queues, block_very_low_redundancy,
+                  LowRedundancyBlocks.QUEUE_VERY_LOW_REDUNDANCY);
+
+    //insert a corrupt block with replication factor 1
+    assertAdded(queues, block_corrupt_repl_one, 0, 0, 1);
+    assertEquals(2, queues.getCorruptBlockSize());
+    assertEquals(1, queues.getCorruptReplOneBlockSize());
+    queues.update(block_corrupt_repl_one, 0, 0, 0, 3, 0, 2);
+    assertEquals(0, queues.getCorruptReplOneBlockSize());
+    queues.update(block_corrupt, 0, 0, 0, 1, 0, -2);
+    assertEquals(1, queues.getCorruptReplOneBlockSize());
+    queues.update(block_very_low_redundancy, 0, 0, 0, 1, -4, -24);
+    assertEquals(2, queues.getCorruptReplOneBlockSize());
+  }
+
+  @Test
+  public void testStripedBlockPriorities() throws Throwable {
+    int dataBlkNum = ecPolicy.getNumDataUnits();
+    int parityBlkNUm = ecPolicy.getNumParityUnits();
+    doTestStripedBlockPriorities(1, parityBlkNUm);
+    doTestStripedBlockPriorities(dataBlkNum, parityBlkNUm);
+  }
+
+  private void doTestStripedBlockPriorities(int dataBlkNum, int parityBlkNum)
+      throws Throwable {
+    int groupSize = dataBlkNum + parityBlkNum;
+    long numBytes = ecPolicy.getCellSize() * dataBlkNum;
+    LowRedundancyBlocks queues = new LowRedundancyBlocks();
+    int numUR = 0;
+    int numCorrupt = 0;
+
+    // add low redundancy blocks
+    for (int i = 0; dataBlkNum + i < groupSize; i++) {
+      BlockInfo block = genStripedBlockInfo(-100 - 100 * i, numBytes);
+      assertAdded(queues, block, dataBlkNum + i, 0, groupSize);
+      numUR++;
+      assertEquals(numUR, queues.getLowRedundancyBlockCount());
+      assertEquals(numUR + numCorrupt, queues.size());
+      if (i == 0) {
+        assertInLevel(queues, block,
+            LowRedundancyBlocks.QUEUE_HIGHEST_PRIORITY);
+      } else if (i * 3 < parityBlkNum + 1) {
+        assertInLevel(queues, block,
+            LowRedundancyBlocks.QUEUE_VERY_LOW_REDUNDANCY);
+      } else {
+        assertInLevel(queues, block,
+            LowRedundancyBlocks.QUEUE_LOW_REDUNDANCY);
+      }
+    }
+
+    // add a corrupted block
+    BlockInfo block_corrupt = genStripedBlockInfo(-10, numBytes);
+    assertEquals(numCorrupt, queues.getCorruptBlockSize());
+    assertAdded(queues, block_corrupt, dataBlkNum - 1, 0, groupSize);
+    numCorrupt++;
+    assertEquals(numUR + numCorrupt, queues.size());
+    assertEquals(numUR, queues.getLowRedundancyBlockCount());
+    assertEquals(numCorrupt, queues.getCorruptBlockSize());
+    assertInLevel(queues, block_corrupt,
+        LowRedundancyBlocks.QUEUE_WITH_CORRUPT_BLOCKS);
+  }
+
+  private void assertAdded(LowRedundancyBlocks queues,
+                           BlockInfo block,
+                           int curReplicas,
+                           int decomissionedReplicas,
+                           int expectedReplicas) {
+    assertTrue("Failed to add " + block,
+               queues.add(block,
+                          curReplicas, 0,
+                          decomissionedReplicas,
+                          expectedReplicas));
+  }
+
+  /**
+   * Determine whether or not a block is in a level without changing the API.
+   * Instead get the per-level iterator and run though it looking for a match.
+   * If the block is not found, an assertion is thrown.
+   *
+   * This is inefficient, but this is only a test case.
+   * @param queues queues to scan
+   * @param block block to look for
+   * @param level level to select
+   */
+  private void assertInLevel(LowRedundancyBlocks queues,
+                             Block block,
+                             int level) {
+    final Iterator<BlockInfo> bi = queues.iterator(level);
+    while (bi.hasNext()) {
+      Block next = bi.next();
+      if (block.equals(next)) {
+        return;
+      }
+    }
+    fail("Block " + block + " not found in level " + level);
+  }
+}