You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zj...@apache.org on 2015/03/13 00:04:33 UTC

[14/49] hadoop git commit: HDFS-7411. Change decommission logic to throttle by blocks rather than nodes in each interval. Contributed by Andrew Wang

HDFS-7411. Change decommission logic to throttle by blocks rather
than nodes in each interval. Contributed by Andrew Wang


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6ee0d32b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6ee0d32b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6ee0d32b

Branch: refs/heads/YARN-2928
Commit: 6ee0d32b98bc3aa5ed42859f1325d5a14fd1722a
Parents: 7ce3c76
Author: Chris Douglas <cd...@apache.org>
Authored: Sun Mar 8 18:31:04 2015 -0700
Committer: Chris Douglas <cd...@apache.org>
Committed: Sun Mar 8 18:31:04 2015 -0700

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |   6 +-
 .../apache/hadoop/hdfs/HdfsConfiguration.java   |   2 +-
 .../server/blockmanagement/BlockManager.java    | 121 +---
 .../server/blockmanagement/DatanodeManager.java | 109 +---
 .../blockmanagement/DecommissionManager.java    | 619 +++++++++++++++++--
 .../src/main/resources/hdfs-default.xml         |  23 +-
 .../apache/hadoop/hdfs/TestDecommission.java    | 412 ++++++++----
 .../blockmanagement/BlockManagerTestUtil.java   |   8 +-
 .../TestReplicationPolicyConsiderLoad.java      |   2 +-
 .../namenode/TestDecommissioningStatus.java     |  59 +-
 .../hadoop/hdfs/server/namenode/TestFsck.java   |   2 +-
 .../namenode/TestNamenodeCapacityReport.java    |   4 +-
 13 files changed, 996 insertions(+), 374 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ee0d32b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 29717e1..3cd6372 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -719,6 +719,9 @@ Release 2.7.0 - UNRELEASED
 
     HDFS-7855. Separate class Packet from DFSOutputStream. (Li Bo bia jing9)
 
+    HDFS-7411. Change decommission logic to throttle by blocks rather than
+    nodes in each interval. (Andrew Wang via cdouglas)
+
   OPTIMIZATIONS
 
     HDFS-7454. Reduce memory footprint for AclEntries in NameNode.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ee0d32b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 975f023..1e864bd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -453,8 +453,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final long    DFS_NAMENODE_PATH_BASED_CACHE_RETRY_INTERVAL_MS_DEFAULT = 30000L;
   public static final String  DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY = "dfs.namenode.decommission.interval";
   public static final int     DFS_NAMENODE_DECOMMISSION_INTERVAL_DEFAULT = 30;
-  public static final String  DFS_NAMENODE_DECOMMISSION_NODES_PER_INTERVAL_KEY = "dfs.namenode.decommission.nodes.per.interval";
-  public static final int     DFS_NAMENODE_DECOMMISSION_NODES_PER_INTERVAL_DEFAULT = 5;
+  public static final String  DFS_NAMENODE_DECOMMISSION_BLOCKS_PER_INTERVAL_KEY = "dfs.namenode.decommission.blocks.per.interval";
+  public static final int     DFS_NAMENODE_DECOMMISSION_BLOCKS_PER_INTERVAL_DEFAULT = 500000;
+  public static final String  DFS_NAMENODE_DECOMMISSION_MAX_CONCURRENT_TRACKED_NODES = "dfs.namenode.decommission.max.concurrent.tracked.nodes";
+  public static final int     DFS_NAMENODE_DECOMMISSION_MAX_CONCURRENT_TRACKED_NODES_DEFAULT = 100;
   public static final String  DFS_NAMENODE_HANDLER_COUNT_KEY = "dfs.namenode.handler.count";
   public static final int     DFS_NAMENODE_HANDLER_COUNT_DEFAULT = 10;
   public static final String  DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY = "dfs.namenode.service.handler.count";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ee0d32b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HdfsConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HdfsConfiguration.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HdfsConfiguration.java
index 8f2966a..29a2667 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HdfsConfiguration.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HdfsConfiguration.java
@@ -139,7 +139,7 @@ public class HdfsConfiguration extends Configuration {
       new DeprecationDelta("dfs.federation.nameservice.id",
         DFSConfigKeys.DFS_NAMESERVICE_ID),
       new DeprecationDelta("dfs.client.file-block-storage-locations.timeout",
-        DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS)
+        DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS),
     });
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ee0d32b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 58a8b94..c1a3e05 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -3188,28 +3188,6 @@ public class BlockManager {
     }
     return live;
   }
-
-  private void logBlockReplicationInfo(Block block, DatanodeDescriptor srcNode,
-      NumberReplicas num) {
-    int curReplicas = num.liveReplicas();
-    int curExpectedReplicas = getReplication(block);
-    BlockCollection bc = blocksMap.getBlockCollection(block);
-    StringBuilder nodeList = new StringBuilder();
-    for(DatanodeStorageInfo storage : blocksMap.getStorages(block)) {
-      final DatanodeDescriptor node = storage.getDatanodeDescriptor();
-      nodeList.append(node);
-      nodeList.append(" ");
-    }
-    LOG.info("Block: " + block + ", Expected Replicas: "
-        + curExpectedReplicas + ", live replicas: " + curReplicas
-        + ", corrupt replicas: " + num.corruptReplicas()
-        + ", decommissioned replicas: " + num.decommissionedReplicas()
-        + ", excess replicas: " + num.excessReplicas()
-        + ", Is Open File: " + bc.isUnderConstruction()
-        + ", Datanodes having this block: " + nodeList + ", Current Datanode: "
-        + srcNode + ", Is current datanode decommissioning: "
-        + srcNode.isDecommissionInProgress());
-  }
   
   /**
    * On stopping decommission, check if the node has excess replicas.
@@ -3240,89 +3218,30 @@ public class BlockManager {
   }
 
   /**
-   * Return true if there are any blocks on this node that have not
-   * yet reached their replication factor. Otherwise returns false.
+   * Returns whether a node can be safely decommissioned based on its 
+   * liveness. Dead nodes cannot always be safely decommissioned.
    */
-  boolean isReplicationInProgress(DatanodeDescriptor srcNode) {
-    boolean status = false;
-    boolean firstReplicationLog = true;
-    int underReplicatedBlocks = 0;
-    int decommissionOnlyReplicas = 0;
-    int underReplicatedInOpenFiles = 0;
-    final Iterator<? extends Block> it = srcNode.getBlockIterator();
-    while(it.hasNext()) {
-      final Block block = it.next();
-      BlockCollection bc = blocksMap.getBlockCollection(block);
-
-      if (bc != null) {
-        NumberReplicas num = countNodes(block);
-        int curReplicas = num.liveReplicas();
-        int curExpectedReplicas = getReplication(block);
-                
-        if (isNeededReplication(block, curExpectedReplicas, curReplicas)) {
-          if (curExpectedReplicas > curReplicas) {
-            if (bc.isUnderConstruction()) {
-              if (block.equals(bc.getLastBlock()) && curReplicas > minReplication) {
-                continue;
-              }
-              underReplicatedInOpenFiles++;
-            }
-            
-            // Log info about one block for this node which needs replication
-            if (!status) {
-              status = true;
-              if (firstReplicationLog) {
-                logBlockReplicationInfo(block, srcNode, num);
-              }
-              // Allowing decommission as long as default replication is met
-              if (curReplicas >= defaultReplication) {
-                status = false;
-                firstReplicationLog = false;
-              }
-            }
-            underReplicatedBlocks++;
-            if ((curReplicas == 0) && (num.decommissionedReplicas() > 0)) {
-              decommissionOnlyReplicas++;
-            }
-          }
-          if (!neededReplications.contains(block) &&
-            pendingReplications.getNumReplicas(block) == 0 &&
-            namesystem.isPopulatingReplQueues()) {
-            //
-            // These blocks have been reported from the datanode
-            // after the startDecommission method has been executed. These
-            // blocks were in flight when the decommissioning was started.
-            // Process these blocks only when active NN is out of safe mode.
-            //
-            neededReplications.add(block,
-                                   curReplicas,
-                                   num.decommissionedReplicas(),
-                                   curExpectedReplicas);
-          }
-        }
-      }
+  boolean isNodeHealthyForDecommission(DatanodeDescriptor node) {
+    if (node.isAlive) {
+      return true;
     }
 
-    if (!status && !srcNode.isAlive) {
-      updateState();
-      if (pendingReplicationBlocksCount == 0 &&
-          underReplicatedBlocksCount == 0) {
-        LOG.info("srcNode {} is dead and there are no under-replicated" +
-            " blocks or blocks pending replication. Marking as " +
-            "decommissioned.");
-      } else {
-        LOG.warn("srcNode " + srcNode + " is dead " +
-            "while decommission is in progress. Continuing to mark " +
-            "it as decommission in progress so when it rejoins the " +
-            "cluster it can continue the decommission process.");
-        status = true;
-      }
+    updateState();
+    if (pendingReplicationBlocksCount == 0 &&
+        underReplicatedBlocksCount == 0) {
+      LOG.info("Node {} is dead and there are no under-replicated" +
+          " blocks or blocks pending replication. Safe to decommission.", 
+          node);
+      return true;
     }
 
-    srcNode.decommissioningStatus.set(underReplicatedBlocks,
-        decommissionOnlyReplicas, 
-        underReplicatedInOpenFiles);
-    return status;
+    LOG.warn("Node {} is dead " +
+        "while decommission is in progress. Cannot be safely " +
+        "decommissioned since there is risk of reduced " +
+        "data durability or data loss. Either restart the failed node or" +
+        " force decommissioning by removing, calling refreshNodes, " +
+        "then re-adding to the excludes files.", node);
+    return false;
   }
 
   public int getActiveBlockCount() {
@@ -3493,7 +3412,7 @@ public class BlockManager {
    * A block needs replication if the number of replicas is less than expected
    * or if it does not have enough racks.
    */
-  private boolean isNeededReplication(Block b, int expected, int current) {
+  boolean isNeededReplication(Block b, int expected, int current) {
     return current < expected || !blockHasEnoughRacks(b);
   }
   

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ee0d32b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
index 45c56a8..9179ff0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
@@ -42,7 +42,6 @@ import org.apache.hadoop.hdfs.util.CyclicIteration;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.net.*;
 import org.apache.hadoop.net.NetworkTopology.InvalidTopologyException;
-import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.Time;
 
@@ -53,8 +52,6 @@ import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
 import java.util.*;
 
-import static org.apache.hadoop.util.Time.now;
-
 /**
  * Manage datanodes, include decommission and other activities.
  */
@@ -65,9 +62,9 @@ public class DatanodeManager {
 
   private final Namesystem namesystem;
   private final BlockManager blockManager;
+  private final DecommissionManager decomManager;
   private final HeartbeatManager heartbeatManager;
   private final FSClusterStats fsClusterStats;
-  private Daemon decommissionthread = null;
 
   /**
    * Stores the datanode -> block map.  
@@ -110,7 +107,7 @@ public class DatanodeManager {
   private final HostFileManager hostFileManager = new HostFileManager();
 
   /** The period to wait for datanode heartbeat.*/
-  private final long heartbeatExpireInterval;
+  private long heartbeatExpireInterval;
   /** Ask Datanode only up to this many blocks to delete. */
   final int blockInvalidateLimit;
 
@@ -184,6 +181,8 @@ public class DatanodeManager {
     networktopology = NetworkTopology.getInstance(conf);
 
     this.heartbeatManager = new HeartbeatManager(namesystem, blockManager, conf);
+    this.decomManager = new DecommissionManager(namesystem, blockManager,
+        heartbeatManager);
     this.fsClusterStats = newFSClusterStats();
 
     this.defaultXferPort = NetUtils.createSocketAddr(
@@ -307,25 +306,12 @@ public class DatanodeManager {
   }
   
   void activate(final Configuration conf) {
-    final DecommissionManager dm = new DecommissionManager(namesystem, blockManager);
-    this.decommissionthread = new Daemon(dm.new Monitor(
-        conf.getInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY, 
-                    DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_DEFAULT),
-        conf.getInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_NODES_PER_INTERVAL_KEY, 
-                    DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_NODES_PER_INTERVAL_DEFAULT)));
-    decommissionthread.start();
-
+    decomManager.activate(conf);
     heartbeatManager.activate(conf);
   }
 
   void close() {
-    if (decommissionthread != null) {
-      decommissionthread.interrupt();
-      try {
-        decommissionthread.join(3000);
-      } catch (InterruptedException e) {
-      }
-    }
+    decomManager.close();
     heartbeatManager.close();
   }
 
@@ -340,6 +326,20 @@ public class DatanodeManager {
   }
 
   @VisibleForTesting
+  public DecommissionManager getDecomManager() {
+    return decomManager;
+  }
+
+  HostFileManager getHostFileManager() {
+    return hostFileManager;
+  }
+
+  @VisibleForTesting
+  public void setHeartbeatExpireInterval(long expiryMs) {
+    this.heartbeatExpireInterval = expiryMs;
+  }
+
+  @VisibleForTesting
   public FSClusterStats getFSClusterStats() {
     return fsClusterStats;
   }
@@ -826,63 +826,14 @@ public class DatanodeManager {
   }
 
   /**
-   * Decommission the node if it is in exclude list.
+   * Decommission the node if it is in the host exclude list.
+   *
+   * @param nodeReg datanode
    */
-  private void checkDecommissioning(DatanodeDescriptor nodeReg) { 
+  void startDecommissioningIfExcluded(DatanodeDescriptor nodeReg) {
     // If the registered node is in exclude list, then decommission it
-    if (hostFileManager.isExcluded(nodeReg)) {
-      startDecommission(nodeReg);
-    }
-  }
-
-  /**
-   * Change, if appropriate, the admin state of a datanode to 
-   * decommission completed. Return true if decommission is complete.
-   */
-  boolean checkDecommissionState(DatanodeDescriptor node) {
-    // Check to see if all blocks in this decommissioned
-    // node has reached their target replication factor.
-    if (node.isDecommissionInProgress() && node.checkBlockReportReceived()) {
-      if (!blockManager.isReplicationInProgress(node)) {
-        node.setDecommissioned();
-        LOG.info("Decommission complete for " + node);
-      }
-    }
-    return node.isDecommissioned();
-  }
-
-  /** Start decommissioning the specified datanode. */
-  @InterfaceAudience.Private
-  @VisibleForTesting
-  public void startDecommission(DatanodeDescriptor node) {
-    if (!node.isDecommissionInProgress()) {
-      if (!node.isAlive) {
-        LOG.info("Dead node " + node + " is decommissioned immediately.");
-        node.setDecommissioned();
-      } else if (!node.isDecommissioned()) {
-        for (DatanodeStorageInfo storage : node.getStorageInfos()) {
-          LOG.info("Start Decommissioning " + node + " " + storage
-              + " with " + storage.numBlocks() + " blocks");
-        }
-        heartbeatManager.startDecommission(node);
-        node.decommissioningStatus.setStartTime(now());
-
-        // all the blocks that reside on this node have to be replicated.
-        checkDecommissionState(node);
-      }
-    }
-  }
-
-  /** Stop decommissioning the specified datanodes. */
-  void stopDecommission(DatanodeDescriptor node) {
-    if (node.isDecommissionInProgress() || node.isDecommissioned()) {
-      LOG.info("Stop Decommissioning " + node);
-      heartbeatManager.stopDecommission(node);
-      // Over-replicated blocks will be detected and processed when 
-      // the dead node comes back and send in its full block report.
-      if (node.isAlive) {
-        blockManager.processOverReplicatedBlocksOnReCommission(node);
-      }
+    if (getHostFileManager().isExcluded(nodeReg)) {
+      decomManager.startDecommission(nodeReg);
     }
   }
 
@@ -993,7 +944,7 @@ public class DatanodeManager {
           // also treat the registration message as a heartbeat
           heartbeatManager.register(nodeS);
           incrementVersionCount(nodeS.getSoftwareVersion());
-          checkDecommissioning(nodeS);
+          startDecommissioningIfExcluded(nodeS);
           success = true;
         } finally {
           if (!success) {
@@ -1029,7 +980,7 @@ public class DatanodeManager {
         // because its is done when the descriptor is created
         heartbeatManager.addDatanode(nodeDescr);
         incrementVersionCount(nodeReg.getSoftwareVersion());
-        checkDecommissioning(nodeDescr);
+        startDecommissioningIfExcluded(nodeDescr);
         success = true;
       } finally {
         if (!success) {
@@ -1092,9 +1043,9 @@ public class DatanodeManager {
         node.setDisallowed(true); // case 2.
       } else {
         if (hostFileManager.isExcluded(node)) {
-          startDecommission(node); // case 3.
+          decomManager.startDecommission(node); // case 3.
         } else {
-          stopDecommission(node); // case 4.
+          decomManager.stopDecommission(node); // case 4.
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ee0d32b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
index a234cf5..71c88f1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
@@ -17,88 +17,605 @@
  */
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
+import java.util.AbstractList;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
+import java.util.Queue;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.server.namenode.Namesystem;
+import org.apache.hadoop.hdfs.util.CyclicIteration;
+import org.apache.hadoop.util.ChunkedArrayList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.hadoop.util.Time.now;
 
 /**
- * Manage node decommissioning.
+ * Manages datanode decommissioning. A background monitor thread 
+ * periodically checks the status of datanodes that are in-progress of 
+ * decommissioning.
+ * <p/>
+ * A datanode can be decommissioned in a few situations:
+ * <ul>
+ * <li>If a DN is dead, it is decommissioned immediately.</li>
+ * <li>If a DN is alive, it is decommissioned after all of its blocks 
+ * are sufficiently replicated. Merely under-replicated blocks do not 
+ * block decommissioning as long as they are above a replication 
+ * threshold.</li>
+ * </ul>
+ * In the second case, the datanode transitions to a 
+ * decommission-in-progress state and is tracked by the monitor thread. The 
+ * monitor periodically scans through the list of insufficiently replicated
+ * blocks on these datanodes to 
+ * determine if they can be decommissioned. The monitor also prunes this list 
+ * as blocks become replicated, so monitor scans will become more efficient 
+ * over time.
+ * <p/>
+ * Decommission-in-progress nodes that become dead do not progress to 
+ * decommissioned until they become live again. This prevents potential 
+ * durability loss for singly-replicated blocks (see HDFS-6791).
+ * <p/>
+ * This class depends on the FSNamesystem lock for synchronization.
  */
 @InterfaceAudience.Private
-@InterfaceStability.Evolving
-class DecommissionManager {
-  static final Log LOG = LogFactory.getLog(DecommissionManager.class);
+public class DecommissionManager {
+  private static final Logger LOG = LoggerFactory.getLogger(DecommissionManager
+      .class);
 
   private final Namesystem namesystem;
-  private final BlockManager blockmanager;
+  private final BlockManager blockManager;
+  private final HeartbeatManager hbManager;
+  private final ScheduledExecutorService executor;
+
+  /**
+   * Map containing the decommission-in-progress datanodes that are being
+   * tracked so they can be be marked as decommissioned.
+   * <p/>
+   * This holds a set of references to the under-replicated blocks on the DN at
+   * the time the DN is added to the map, i.e. the blocks that are preventing
+   * the node from being marked as decommissioned. During a monitor tick, this
+   * list is pruned as blocks becomes replicated.
+   * <p/>
+   * Note also that the reference to the list of under-replicated blocks 
+   * will be null on initial add
+   * <p/>
+   * However, this map can become out-of-date since it is not updated by block
+   * reports or other events. Before being finally marking as decommissioned,
+   * another check is done with the actual block map.
+   */
+  private final TreeMap<DatanodeDescriptor, AbstractList<BlockInfoContiguous>>
+      decomNodeBlocks;
+
+  /**
+   * Tracking a node in decomNodeBlocks consumes additional memory. To limit
+   * the impact on NN memory consumption, we limit the number of nodes in 
+   * decomNodeBlocks. Additional nodes wait in pendingNodes.
+   */
+  private final Queue<DatanodeDescriptor> pendingNodes;
+
+  private Monitor monitor = null;
 
   DecommissionManager(final Namesystem namesystem,
-      final BlockManager blockmanager) {
+      final BlockManager blockManager, final HeartbeatManager hbManager) {
     this.namesystem = namesystem;
-    this.blockmanager = blockmanager;
+    this.blockManager = blockManager;
+    this.hbManager = hbManager;
+
+    executor = Executors.newScheduledThreadPool(1,
+        new ThreadFactoryBuilder().setNameFormat("DecommissionMonitor-%d")
+            .setDaemon(true).build());
+    decomNodeBlocks = new TreeMap<>();
+    pendingNodes = new LinkedList<>();
+  }
+
+  /**
+   * Start the decommission monitor thread.
+   * @param conf
+   */
+  void activate(Configuration conf) {
+    final int intervalSecs =
+        conf.getInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY,
+            DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_DEFAULT);
+    checkArgument(intervalSecs >= 0, "Cannot set a negative " +
+        "value for " + DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY);
+
+    // By default, the new configuration key overrides the deprecated one.
+    // No # node limit is set.
+    int blocksPerInterval = conf.getInt(
+        DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_BLOCKS_PER_INTERVAL_KEY,
+        DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_BLOCKS_PER_INTERVAL_DEFAULT);
+    int nodesPerInterval = Integer.MAX_VALUE;
+
+    // If the expected key isn't present and the deprecated one is, 
+    // use the deprecated one into the new one. This overrides the 
+    // default.
+    //
+    // Also print a deprecation warning.
+    final String deprecatedKey =
+        "dfs.namenode.decommission.nodes.per.interval";
+    final String strNodes = conf.get(deprecatedKey);
+    if (strNodes != null) {
+      nodesPerInterval = Integer.parseInt(strNodes);
+      blocksPerInterval = Integer.MAX_VALUE;
+      LOG.warn("Using deprecated configuration key {} value of {}.",
+          deprecatedKey, nodesPerInterval); 
+      LOG.warn("Please update your configuration to use {} instead.", 
+          DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_BLOCKS_PER_INTERVAL_KEY);
+    }
+    checkArgument(blocksPerInterval > 0,
+        "Must set a positive value for "
+        + DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_BLOCKS_PER_INTERVAL_KEY);
+
+    final int maxConcurrentTrackedNodes = conf.getInt(
+        DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_MAX_CONCURRENT_TRACKED_NODES,
+        DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_MAX_CONCURRENT_TRACKED_NODES_DEFAULT);
+    checkArgument(maxConcurrentTrackedNodes >= 0, "Cannot set a negative " +
+        "value for "
+        + DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_MAX_CONCURRENT_TRACKED_NODES);
+
+    monitor = new Monitor(blocksPerInterval, 
+        nodesPerInterval, maxConcurrentTrackedNodes);
+    executor.scheduleAtFixedRate(monitor, intervalSecs, intervalSecs,
+        TimeUnit.SECONDS);
+
+    LOG.debug("Activating DecommissionManager with interval {} seconds, " +
+            "{} max blocks per interval, {} max nodes per interval, " +
+            "{} max concurrently tracked nodes.", intervalSecs,
+        blocksPerInterval, nodesPerInterval, maxConcurrentTrackedNodes);
+  }
+
+  /**
+   * Stop the decommission monitor thread, waiting briefly for it to terminate.
+   */
+  void close() {
+    executor.shutdownNow();
+    try {
+      executor.awaitTermination(3000, TimeUnit.MILLISECONDS);
+    } catch (InterruptedException e) {}
+  }
+
+  /**
+   * Start decommissioning the specified datanode. 
+   * @param node
+   */
+  @VisibleForTesting
+  public void startDecommission(DatanodeDescriptor node) {
+    if (!node.isDecommissionInProgress()) {
+      if (!node.isAlive) {
+        LOG.info("Dead node {} is decommissioned immediately.", node);
+        node.setDecommissioned();
+      } else if (!node.isDecommissioned()) {
+        for (DatanodeStorageInfo storage : node.getStorageInfos()) {
+          LOG.info("Starting decommission of {} {} with {} blocks", 
+              node, storage, storage.numBlocks());
+        }
+        // Update DN stats maintained by HeartbeatManager
+        hbManager.startDecommission(node);
+        node.decommissioningStatus.setStartTime(now());
+        pendingNodes.add(node);
+      }
+    } else {
+      LOG.trace("startDecommission: Node {} is already decommission in "
+              + "progress, nothing to do.", node);
+    }
+  }
+
+  /**
+   * Stop decommissioning the specified datanode. 
+   * @param node
+   */
+  void stopDecommission(DatanodeDescriptor node) {
+    if (node.isDecommissionInProgress() || node.isDecommissioned()) {
+      LOG.info("Stopping decommissioning of node {}", node);
+      // Update DN stats maintained by HeartbeatManager
+      hbManager.stopDecommission(node);
+      // Over-replicated blocks will be detected and processed when 
+      // the dead node comes back and send in its full block report.
+      if (node.isAlive) {
+        blockManager.processOverReplicatedBlocksOnReCommission(node);
+      }
+      // Remove from tracking in DecommissionManager
+      pendingNodes.remove(node);
+      decomNodeBlocks.remove(node);
+    } else {
+      LOG.trace("stopDecommission: Node {} is not decommission in progress " +
+          "or decommissioned, nothing to do.", node);
+    }
+  }
+
+  private void setDecommissioned(DatanodeDescriptor dn) {
+    dn.setDecommissioned();
+    LOG.info("Decommissioning complete for node {}", dn);
   }
 
-  /** Periodically check decommission status. */
-  class Monitor implements Runnable {
-    /** recheckInterval is how often namenode checks
-     *  if a node has finished decommission
+  /**
+   * Checks whether a block is sufficiently replicated for decommissioning.
+   * Full-strength replication is not always necessary, hence "sufficient".
+   * @return true if sufficient, else false.
+   */
+  private boolean isSufficientlyReplicated(BlockInfoContiguous block, 
+      BlockCollection bc,
+      NumberReplicas numberReplicas) {
+    final int numExpected = bc.getBlockReplication();
+    final int numLive = numberReplicas.liveReplicas();
+    if (!blockManager.isNeededReplication(block, numExpected, numLive)) {
+      // Block doesn't need replication. Skip.
+      LOG.trace("Block {} does not need replication.", block);
+      return true;
+    }
+
+    // Block is under-replicated
+    LOG.trace("Block {} numExpected={}, numLive={}", block, numExpected, 
+        numLive);
+    if (numExpected > numLive) {
+      if (bc.isUnderConstruction() && block.equals(bc.getLastBlock())) {
+        // Can decom a UC block as long as there will still be minReplicas
+        if (numLive >= blockManager.minReplication) {
+          LOG.trace("UC block {} sufficiently-replicated since numLive ({}) "
+              + ">= minR ({})", block, numLive, blockManager.minReplication);
+          return true;
+        } else {
+          LOG.trace("UC block {} insufficiently-replicated since numLive "
+              + "({}) < minR ({})", block, numLive,
+              blockManager.minReplication);
+        }
+      } else {
+        // Can decom a non-UC as long as the default replication is met
+        if (numLive >= blockManager.defaultReplication) {
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+
+  private static void logBlockReplicationInfo(Block block, BlockCollection bc,
+      DatanodeDescriptor srcNode, NumberReplicas num,
+      Iterable<DatanodeStorageInfo> storages) {
+    int curReplicas = num.liveReplicas();
+    int curExpectedReplicas = bc.getBlockReplication();
+    StringBuilder nodeList = new StringBuilder();
+    for (DatanodeStorageInfo storage : storages) {
+      final DatanodeDescriptor node = storage.getDatanodeDescriptor();
+      nodeList.append(node);
+      nodeList.append(" ");
+    }
+    LOG.info("Block: " + block + ", Expected Replicas: "
+        + curExpectedReplicas + ", live replicas: " + curReplicas
+        + ", corrupt replicas: " + num.corruptReplicas()
+        + ", decommissioned replicas: " + num.decommissionedReplicas()
+        + ", excess replicas: " + num.excessReplicas()
+        + ", Is Open File: " + bc.isUnderConstruction()
+        + ", Datanodes having this block: " + nodeList + ", Current Datanode: "
+        + srcNode + ", Is current datanode decommissioning: "
+        + srcNode.isDecommissionInProgress());
+  }
+
+  @VisibleForTesting
+  public int getNumPendingNodes() {
+    return pendingNodes.size();
+  }
+
+  @VisibleForTesting
+  public int getNumTrackedNodes() {
+    return decomNodeBlocks.size();
+  }
+
+  @VisibleForTesting
+  public int getNumNodesChecked() {
+    return monitor.numNodesChecked;
+  }
+
+  /**
+   * Checks to see if DNs have finished decommissioning.
+   * <p/>
+   * Since this is done while holding the namesystem lock, 
+   * the amount of work per monitor tick is limited.
+   */
+  private class Monitor implements Runnable {
+    /**
+     * The maximum number of blocks to check per tick.
+     */
+    private final int numBlocksPerCheck;
+    /**
+     * The maximum number of nodes to check per tick.
      */
-    private final long recheckInterval;
-    /** The number of decommission nodes to check for each interval */
     private final int numNodesPerCheck;
-    /** firstkey can be initialized to anything. */
-    private String firstkey = "";
+    /**
+     * The maximum number of nodes to track in decomNodeBlocks. A value of 0
+     * means no limit.
+     */
+    private final int maxConcurrentTrackedNodes;
+    /**
+     * The number of blocks that have been checked on this tick.
+     */
+    private int numBlocksChecked = 0;
+    /**
+     * The number of nodes that have been checked on this tick. Used for 
+     * testing.
+     */
+    private int numNodesChecked = 0;
+    /**
+     * The last datanode in decomNodeBlocks that we've processed
+     */
+    private DatanodeDescriptor iterkey = new DatanodeDescriptor(new 
+        DatanodeID("", "", "", 0, 0, 0, 0));
 
-    Monitor(int recheckIntervalInSecond, int numNodesPerCheck) {
-      this.recheckInterval = recheckIntervalInSecond * 1000L;
+    Monitor(int numBlocksPerCheck, int numNodesPerCheck, int 
+        maxConcurrentTrackedNodes) {
+      this.numBlocksPerCheck = numBlocksPerCheck;
       this.numNodesPerCheck = numNodesPerCheck;
+      this.maxConcurrentTrackedNodes = maxConcurrentTrackedNodes;
+    }
+
+    private boolean exceededNumBlocksPerCheck() {
+      LOG.trace("Processed {} blocks so far this tick", numBlocksChecked);
+      return numBlocksChecked >= numBlocksPerCheck;
+    }
+
+    @Deprecated
+    private boolean exceededNumNodesPerCheck() {
+      LOG.trace("Processed {} nodes so far this tick", numNodesChecked);
+      return numNodesChecked >= numNodesPerCheck;
     }
 
-    /**
-     * Check decommission status of numNodesPerCheck nodes
-     * for every recheckInterval milliseconds.
-     */
     @Override
     public void run() {
-      for(; namesystem.isRunning(); ) {
-        namesystem.writeLock();
-        try {
-          check();
-        } finally {
-          namesystem.writeUnlock();
+      if (!namesystem.isRunning()) {
+        LOG.info("Namesystem is not running, skipping decommissioning checks"
+            + ".");
+        return;
+      }
+      // Reset the checked count at beginning of each iteration
+      numBlocksChecked = 0;
+      numNodesChecked = 0;
+      // Check decom progress
+      namesystem.writeLock();
+      try {
+        processPendingNodes();
+        check();
+      } finally {
+        namesystem.writeUnlock();
+      }
+      if (numBlocksChecked + numNodesChecked > 0) {
+        LOG.info("Checked {} blocks and {} nodes this tick", numBlocksChecked,
+            numNodesChecked);
+      }
+    }
+
+    /**
+     * Pop datanodes off the pending list and into decomNodeBlocks, 
+     * subject to the maxConcurrentTrackedNodes limit.
+     */
+    private void processPendingNodes() {
+      while (!pendingNodes.isEmpty() &&
+          (maxConcurrentTrackedNodes == 0 ||
+           decomNodeBlocks.size() < maxConcurrentTrackedNodes)) {
+        decomNodeBlocks.put(pendingNodes.poll(), null);
+      }
+    }
+
+    private void check() {
+      final Iterator<Map.Entry<DatanodeDescriptor, AbstractList<BlockInfoContiguous>>>
+          it = new CyclicIteration<>(decomNodeBlocks, iterkey).iterator();
+      final LinkedList<DatanodeDescriptor> toRemove = new LinkedList<>();
+
+      while (it.hasNext()
+          && !exceededNumBlocksPerCheck()
+          && !exceededNumNodesPerCheck()) {
+        numNodesChecked++;
+        final Map.Entry<DatanodeDescriptor, AbstractList<BlockInfoContiguous>>
+            entry = it.next();
+        final DatanodeDescriptor dn = entry.getKey();
+        AbstractList<BlockInfoContiguous> blocks = entry.getValue();
+        boolean fullScan = false;
+        if (blocks == null) {
+          // This is a newly added datanode, run through its list to schedule 
+          // under-replicated blocks for replication and collect the blocks 
+          // that are insufficiently replicated for further tracking
+          LOG.debug("Newly-added node {}, doing full scan to find " +
+              "insufficiently-replicated blocks.", dn);
+          blocks = handleInsufficientlyReplicated(dn);
+          decomNodeBlocks.put(dn, blocks);
+          fullScan = true;
+        } else {
+          // This is a known datanode, check if its # of insufficiently 
+          // replicated blocks has dropped to zero and if it can be decommed
+          LOG.debug("Processing decommission-in-progress node {}", dn);
+          pruneSufficientlyReplicated(dn, blocks);
         }
-  
-        try {
-          Thread.sleep(recheckInterval);
-        } catch (InterruptedException ie) {
-          LOG.warn(this.getClass().getSimpleName() + " interrupted: " + ie);
+        if (blocks.size() == 0) {
+          if (!fullScan) {
+            // If we didn't just do a full scan, need to re-check with the 
+            // full block map.
+            //
+            // We've replicated all the known insufficiently replicated 
+            // blocks. Re-check with the full block map before finally 
+            // marking the datanode as decommissioned 
+            LOG.debug("Node {} has finished replicating current set of "
+                + "blocks, checking with the full block map.", dn);
+            blocks = handleInsufficientlyReplicated(dn);
+            decomNodeBlocks.put(dn, blocks);
+          }
+          // If the full scan is clean AND the node liveness is okay, 
+          // we can finally mark as decommissioned.
+          final boolean isHealthy =
+              blockManager.isNodeHealthyForDecommission(dn);
+          if (blocks.size() == 0 && isHealthy) {
+            setDecommissioned(dn);
+            toRemove.add(dn);
+            LOG.debug("Node {} is sufficiently replicated and healthy, "
+                + "marked as decommissioned.", dn);
+          } else {
+            if (LOG.isDebugEnabled()) {
+              StringBuilder b = new StringBuilder("Node {} ");
+              if (isHealthy) {
+                b.append("is ");
+              } else {
+                b.append("isn't ");
+              }
+              b.append("healthy and still needs to replicate {} more blocks," +
+                  " decommissioning is still in progress.");
+              LOG.debug(b.toString(), dn, blocks.size());
+            }
+          }
+        } else {
+          LOG.debug("Node {} still has {} blocks to replicate "
+                  + "before it is a candidate to finish decommissioning.",
+              dn, blocks.size());
         }
+        iterkey = dn;
+      }
+      // Remove the datanodes that are decommissioned
+      for (DatanodeDescriptor dn : toRemove) {
+        Preconditions.checkState(dn.isDecommissioned(),
+            "Removing a node that is not yet decommissioned!");
+        decomNodeBlocks.remove(dn);
       }
     }
-    
-    private void check() {
-      final DatanodeManager dm = blockmanager.getDatanodeManager();
-      int count = 0;
-      for(Map.Entry<String, DatanodeDescriptor> entry
-          : dm.getDatanodeCyclicIteration(firstkey)) {
-        final DatanodeDescriptor d = entry.getValue();
-        firstkey = entry.getKey();
-
-        if (d.isDecommissionInProgress()) {
-          try {
-            dm.checkDecommissionState(d);
-          } catch(Exception e) {
-            LOG.warn("entry=" + entry, e);
+
+    /**
+     * Removes sufficiently replicated blocks from the block list of a 
+     * datanode.
+     */
+    private void pruneSufficientlyReplicated(final DatanodeDescriptor datanode,
+        AbstractList<BlockInfoContiguous> blocks) {
+      processBlocksForDecomInternal(datanode, blocks.iterator(), null, true);
+    }
+
+    /**
+     * Returns a list of blocks on a datanode that are insufficiently 
+     * replicated, i.e. are under-replicated enough to prevent decommission.
+     * <p/>
+     * As part of this, it also schedules replication work for 
+     * any under-replicated blocks.
+     *
+     * @param datanode
+     * @return List of insufficiently replicated blocks 
+     */
+    private AbstractList<BlockInfoContiguous> handleInsufficientlyReplicated(
+        final DatanodeDescriptor datanode) {
+      AbstractList<BlockInfoContiguous> insufficient = new ChunkedArrayList<>();
+      processBlocksForDecomInternal(datanode, datanode.getBlockIterator(),
+          insufficient, false);
+      return insufficient;
+    }
+
+    /**
+     * Used while checking if decommission-in-progress datanodes can be marked
+     * as decommissioned. Combines shared logic of 
+     * pruneSufficientlyReplicated and handleInsufficientlyReplicated.
+     *
+     * @param datanode                    Datanode
+     * @param it                          Iterator over the blocks on the
+     *                                    datanode
+     * @param insufficientlyReplicated    Return parameter. If it's not null,
+     *                                    will contain the insufficiently
+     *                                    replicated-blocks from the list.
+     * @param pruneSufficientlyReplicated whether to remove sufficiently
+     *                                    replicated blocks from the iterator
+     * @return true if there are under-replicated blocks in the provided block
+     * iterator, else false.
+     */
+    private void processBlocksForDecomInternal(
+        final DatanodeDescriptor datanode,
+        final Iterator<BlockInfoContiguous> it,
+        final List<BlockInfoContiguous> insufficientlyReplicated,
+        boolean pruneSufficientlyReplicated) {
+      boolean firstReplicationLog = true;
+      int underReplicatedBlocks = 0;
+      int decommissionOnlyReplicas = 0;
+      int underReplicatedInOpenFiles = 0;
+      while (it.hasNext()) {
+        numBlocksChecked++;
+        final BlockInfoContiguous block = it.next();
+        // Remove the block from the list if it's no longer in the block map,
+        // e.g. the containing file has been deleted
+        if (blockManager.blocksMap.getStoredBlock(block) == null) {
+          LOG.trace("Removing unknown block {}", block);
+          it.remove();
+          continue;
+        }
+        BlockCollection bc = blockManager.blocksMap.getBlockCollection(block);
+        if (bc == null) {
+          // Orphan block, will be invalidated eventually. Skip.
+          continue;
+        }
+
+        final NumberReplicas num = blockManager.countNodes(block);
+        final int liveReplicas = num.liveReplicas();
+        final int curReplicas = liveReplicas;
+
+        // Schedule under-replicated blocks for replication if not already
+        // pending
+        if (blockManager.isNeededReplication(block, bc.getBlockReplication(),
+            liveReplicas)) {
+          if (!blockManager.neededReplications.contains(block) &&
+              blockManager.pendingReplications.getNumReplicas(block) == 0 &&
+              namesystem.isPopulatingReplQueues()) {
+            // Process these blocks only when active NN is out of safe mode.
+            blockManager.neededReplications.add(block,
+                curReplicas,
+                num.decommissionedReplicas(),
+                bc.getBlockReplication());
           }
-          if (++count == numNodesPerCheck) {
-            return;
+        }
+
+        // Even if the block is under-replicated, 
+        // it doesn't block decommission if it's sufficiently replicated 
+        if (isSufficientlyReplicated(block, bc, num)) {
+          if (pruneSufficientlyReplicated) {
+            it.remove();
           }
+          continue;
+        }
+
+        // We've found an insufficiently replicated block.
+        if (insufficientlyReplicated != null) {
+          insufficientlyReplicated.add(block);
+        }
+        // Log if this is our first time through
+        if (firstReplicationLog) {
+          logBlockReplicationInfo(block, bc, datanode, num,
+              blockManager.blocksMap.getStorages(block));
+          firstReplicationLog = false;
+        }
+        // Update various counts
+        underReplicatedBlocks++;
+        if (bc.isUnderConstruction()) {
+          underReplicatedInOpenFiles++;
+        }
+        if ((curReplicas == 0) && (num.decommissionedReplicas() > 0)) {
+          decommissionOnlyReplicas++;
         }
       }
+
+      datanode.decommissioningStatus.set(underReplicatedBlocks,
+          decommissionOnlyReplicas,
+          underReplicatedInOpenFiles);
     }
   }
+
+  @VisibleForTesting
+  void runMonitor() throws ExecutionException, InterruptedException {
+    Future f = executor.submit(monitor);
+    f.get();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ee0d32b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 7eacfc5..736c96a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -736,10 +736,25 @@
 </property>
 
 <property>
-  <name>dfs.namenode.decommission.nodes.per.interval</name>
-  <value>5</value>
-  <description>The number of nodes namenode checks if decommission is complete
-  in each dfs.namenode.decommission.interval.</description>
+  <name>dfs.namenode.decommission.blocks.per.interval</name>
+  <value>500000</value>
+  <description>The approximate number of blocks to process per 
+      decommission interval, as defined in dfs.namenode.decommission.interval.
+  </description>
+</property>
+
+<property>
+  <name>dfs.namenode.decommission.max.concurrent.tracked.nodes</name>
+  <value>100</value>
+  <description>
+    The maximum number of decommission-in-progress datanodes nodes that will be
+    tracked at one time by the namenode. Tracking a decommission-in-progress
+    datanode consumes additional NN memory proportional to the number of blocks
+    on the datnode. Having a conservative limit reduces the potential impact
+    of decomissioning a large number of nodes at once.
+      
+    A value of 0 means no limit will be enforced.
+  </description>
 </property>
 
 <property>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ee0d32b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java
index 35c0d8c..081e40f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
@@ -26,39 +27,56 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Random;
+import java.util.concurrent.ExecutionException;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import com.google.common.base.Supplier;
+import com.google.common.collect.Lists;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
+import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.DecommissionManager;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.test.PathUtils;
+import org.apache.log4j.Level;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * This class tests the decommissioning of nodes.
  */
 public class TestDecommission {
-  public static final Log LOG = LogFactory.getLog(TestDecommission.class);
+  public static final Logger LOG = LoggerFactory.getLogger(TestDecommission
+      .class);
   static final long seed = 0xDEADBEEFL;
   static final int blockSize = 8192;
   static final int fileSize = 16384;
@@ -90,6 +108,7 @@ public class TestDecommission {
     conf.set(DFSConfigKeys.DFS_HOSTS_EXCLUDE, excludeFile.toUri().getPath());
     conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 2000);
     conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL);
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1);
     conf.setInt(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, BLOCKREPORT_INTERVAL_MSEC);
     conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, 4);
     conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, NAMENODE_REPLICATION_INTERVAL);
@@ -106,7 +125,7 @@ public class TestDecommission {
     }
   }
   
-  private void writeConfigFile(Path name, ArrayList<String> nodes) 
+  private void writeConfigFile(Path name, List<String> nodes) 
     throws IOException {
     // delete if it already exists
     if (localFileSys.exists(name)) {
@@ -150,7 +169,7 @@ public class TestDecommission {
    * @param downnode - if null, there is no decommissioned node for this file.
    * @return - null if no failure found, else an error message string.
    */
-  private String checkFile(FileSystem fileSys, Path name, int repl,
+  private static String checkFile(FileSystem fileSys, Path name, int repl,
     String downnode, int numDatanodes) throws IOException {
     boolean isNodeDown = (downnode != null);
     // need a raw stream
@@ -262,7 +281,7 @@ public class TestDecommission {
   /* Ask a specific NN to stop decommission of the datanode and wait for each
    * to reach the NORMAL state.
    */
-  private void recomissionNode(int nnIndex, DatanodeInfo decommissionedNode) throws IOException {
+  private void recommissionNode(int nnIndex, DatanodeInfo decommissionedNode) throws IOException {
     LOG.info("Recommissioning node: " + decommissionedNode);
     writeConfigFile(excludeFile, null);
     refreshNodes(cluster.getNamesystem(nnIndex), conf);
@@ -280,7 +299,7 @@ public class TestDecommission {
       LOG.info("Waiting for node " + node + " to change state to "
           + state + " current state: " + node.getAdminState());
       try {
-        Thread.sleep(HEARTBEAT_INTERVAL * 1000);
+        Thread.sleep(HEARTBEAT_INTERVAL * 500);
       } catch (InterruptedException e) {
         // nothing
       }
@@ -322,28 +341,27 @@ public class TestDecommission {
   }
   
   private void verifyStats(NameNode namenode, FSNamesystem fsn,
-      DatanodeInfo node, boolean decommissioning)
+      DatanodeInfo info, DataNode node, boolean decommissioning)
       throws InterruptedException, IOException {
-    // Do the stats check over 10 iterations
+    // Do the stats check over 10 heartbeats
     for (int i = 0; i < 10; i++) {
       long[] newStats = namenode.getRpcServer().getStats();
 
       // For decommissioning nodes, ensure capacity of the DN is no longer
       // counted. Only used space of the DN is counted in cluster capacity
-      assertEquals(newStats[0], decommissioning ? node.getDfsUsed() : 
-        node.getCapacity());
+      assertEquals(newStats[0],
+          decommissioning ? info.getDfsUsed() : info.getCapacity());
 
       // Ensure cluster used capacity is counted for both normal and
       // decommissioning nodes
-      assertEquals(newStats[1], node.getDfsUsed());
+      assertEquals(newStats[1], info.getDfsUsed());
 
       // For decommissioning nodes, remaining space from the DN is not counted
-      assertEquals(newStats[2], decommissioning ? 0 : node.getRemaining());
+      assertEquals(newStats[2], decommissioning ? 0 : info.getRemaining());
 
       // Ensure transceiver count is same as that DN
-      assertEquals(fsn.getTotalLoad(), node.getXceiverCount());
-      
-      Thread.sleep(HEARTBEAT_INTERVAL * 1000); // Sleep heart beat interval
+      assertEquals(fsn.getTotalLoad(), info.getXceiverCount());
+      DataNodeTestUtils.triggerHeartbeat(node);
     }
   }
 
@@ -408,14 +426,6 @@ public class TestDecommission {
   }
   
   /**
-   * Tests recommission for non federated cluster
-   */
-  @Test(timeout=360000)
-  public void testRecommission() throws IOException {
-    testRecommission(1, 6);
-  }
-
-  /**
    * Test decommission for federeated cluster
    */
   @Test(timeout=360000)
@@ -501,12 +511,12 @@ public class TestDecommission {
     //    1. the last DN would have been chosen as excess replica, given its
     //    heartbeat is considered old.
     //    Please refer to BlockPlacementPolicyDefault#chooseReplicaToDelete
-    //    2. After recomissionNode finishes, SBN has 3 live replicas ( 0, 1, 2 )
+    //    2. After recommissionNode finishes, SBN has 3 live replicas ( 0, 1, 2 )
     //    and one excess replica ( 3 )
     // After the fix,
-    //    After recomissionNode finishes, SBN has 4 live replicas ( 0, 1, 2, 3 )
+    //    After recommissionNode finishes, SBN has 4 live replicas ( 0, 1, 2, 3 )
     Thread.sleep(slowHeartbeatDNwaitTime);
-    recomissionNode(1, decomNodeFromSBN);
+    recommissionNode(1, decomNodeFromSBN);
 
     // Step 3.b, ask ANN to recommission the first DN.
     // To verify the fix, the test makes sure the excess replica picked by ANN
@@ -525,7 +535,7 @@ public class TestDecommission {
     cluster.restartDataNode(nextToLastDNprop);
     cluster.waitActive();
     Thread.sleep(slowHeartbeatDNwaitTime);
-    recomissionNode(0, decommissionedNodeFromANN);
+    recommissionNode(0, decommissionedNodeFromANN);
 
     // Step 3.c, make sure the DN has deleted the block and report to NNs
     cluster.triggerHeartbeats();
@@ -607,69 +617,88 @@ public class TestDecommission {
     cluster.shutdown();
   }
 
+  /**
+   * Test that over-replicated blocks are deleted on recommission.
+   */
+  @Test(timeout=120000)
+  public void testRecommission() throws Exception {
+    final int numDatanodes = 6;
+    try {
+      LOG.info("Starting test testRecommission");
 
-  private void testRecommission(int numNamenodes, int numDatanodes) 
-    throws IOException {
-    LOG.info("Starting test testRecommission");
+      startCluster(1, numDatanodes, conf);
 
-    startCluster(numNamenodes, numDatanodes, conf);
-  
-    ArrayList<ArrayList<DatanodeInfo>> namenodeDecomList = 
-      new ArrayList<ArrayList<DatanodeInfo>>(numNamenodes);
-    for(int i = 0; i < numNamenodes; i++) {
-      namenodeDecomList.add(i, new ArrayList<DatanodeInfo>(numDatanodes));
-    }
-    Path file1 = new Path("testDecommission.dat");
-    int replicas = numDatanodes - 1;
-      
-    for (int i = 0; i < numNamenodes; i++) {
-      ArrayList<DatanodeInfo> decommissionedNodes = namenodeDecomList.get(i);
-      FileSystem fileSys = cluster.getFileSystem(i);
+      final Path file1 = new Path("testDecommission.dat");
+      final int replicas = numDatanodes - 1;
+
+      ArrayList<DatanodeInfo> decommissionedNodes = Lists.newArrayList();
+      final FileSystem fileSys = cluster.getFileSystem();
+
+      // Write a file to n-1 datanodes
       writeFile(fileSys, file1, replicas);
-        
-      // Decommission one node. Verify that node is decommissioned.
-      DatanodeInfo decomNode = decommissionNode(i, null, decommissionedNodes,
-          AdminStates.DECOMMISSIONED);
+
+      // Decommission one of the datanodes with a replica
+      BlockLocation loc = fileSys.getFileBlockLocations(file1, 0, 1)[0];
+      assertEquals("Unexpected number of replicas from getFileBlockLocations",
+          replicas, loc.getHosts().length);
+      final String toDecomHost = loc.getNames()[0];
+      String toDecomUuid = null;
+      for (DataNode d : cluster.getDataNodes()) {
+        if (d.getDatanodeId().getXferAddr().equals(toDecomHost)) {
+          toDecomUuid = d.getDatanodeId().getDatanodeUuid();
+          break;
+        }
+      }
+      assertNotNull("Could not find a dn with the block!", toDecomUuid);
+      final DatanodeInfo decomNode =
+          decommissionNode(0, toDecomUuid, decommissionedNodes,
+              AdminStates.DECOMMISSIONED);
       decommissionedNodes.add(decomNode);
-        
+      final BlockManager blockManager =
+          cluster.getNamesystem().getBlockManager();
+      final DatanodeManager datanodeManager =
+          blockManager.getDatanodeManager();
+      BlockManagerTestUtil.recheckDecommissionState(datanodeManager);
+
       // Ensure decommissioned datanode is not automatically shutdown
-      DFSClient client = getDfsClient(cluster.getNameNode(i), conf);
-      assertEquals("All datanodes must be alive", numDatanodes, 
+      DFSClient client = getDfsClient(cluster.getNameNode(), conf);
+      assertEquals("All datanodes must be alive", numDatanodes,
           client.datanodeReport(DatanodeReportType.LIVE).length);
-      int tries =0;
+
       // wait for the block to be replicated
-      while (tries++ < 20) {
-        try {
-          Thread.sleep(1000);
-          if (checkFile(fileSys, file1, replicas, decomNode.getXferAddr(),
-              numDatanodes) == null) {
-            break;
-          }
-        } catch (InterruptedException ie) {
-        }
-      }
-      assertTrue("Checked if block was replicated after decommission, tried "
-          + tries + " times.", tries < 20);
-
-      // stop decommission and check if the new replicas are removed
-      recomissionNode(0, decomNode);
-      // wait for the block to be deleted
-      tries = 0;
-      while (tries++ < 20) {
-        try {
-          Thread.sleep(1000);
-          if (checkFile(fileSys, file1, replicas, null, numDatanodes) == null) {
-            break;
+      final ExtendedBlock b = DFSTestUtil.getFirstBlock(fileSys, file1);
+      final String uuid = toDecomUuid;
+      GenericTestUtils.waitFor(new Supplier<Boolean>() {
+        @Override
+        public Boolean get() {
+          BlockInfoContiguous info =
+              blockManager.getStoredBlock(b.getLocalBlock());
+          int count = 0;
+          StringBuilder sb = new StringBuilder("Replica locations: ");
+          for (int i = 0; i < info.numNodes(); i++) {
+            DatanodeDescriptor dn = info.getDatanode(i);
+            sb.append(dn + ", ");
+            if (!dn.getDatanodeUuid().equals(uuid)) {
+              count++;
+            }
           }
-        } catch (InterruptedException ie) {
+          LOG.info(sb.toString());
+          LOG.info("Count: " + count);
+          return count == replicas;
         }
-      }
+      }, 500, 30000);
+
+      // redecommission and wait for over-replication to be fixed
+      recommissionNode(0, decomNode);
+      BlockManagerTestUtil.recheckDecommissionState(datanodeManager);
+      DFSTestUtil.waitForReplication(cluster, b, 1, replicas, 0);
+
       cleanupFile(fileSys, file1);
-      assertTrue("Checked if node was recommissioned " + tries + " times.",
-         tries < 20);
-      LOG.info("tried: " + tries + " times before recommissioned");
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
     }
-    cluster.shutdown();
   }
   
   /**
@@ -703,20 +732,35 @@ public class TestDecommission {
       
       FSNamesystem fsn = cluster.getNamesystem(i);
       NameNode namenode = cluster.getNameNode(i);
-      DatanodeInfo downnode = decommissionNode(i, null, null,
+      
+      DatanodeInfo decomInfo = decommissionNode(i, null, null,
           AdminStates.DECOMMISSION_INPROGRESS);
+      DataNode decomNode = getDataNode(decomInfo);
       // Check namenode stats for multiple datanode heartbeats
-      verifyStats(namenode, fsn, downnode, true);
+      verifyStats(namenode, fsn, decomInfo, decomNode, true);
       
       // Stop decommissioning and verify stats
       writeConfigFile(excludeFile, null);
       refreshNodes(fsn, conf);
-      DatanodeInfo ret = NameNodeAdapter.getDatanode(fsn, downnode);
-      waitNodeState(ret, AdminStates.NORMAL);
-      verifyStats(namenode, fsn, ret, false);
+      DatanodeInfo retInfo = NameNodeAdapter.getDatanode(fsn, decomInfo);
+      DataNode retNode = getDataNode(decomInfo);
+      waitNodeState(retInfo, AdminStates.NORMAL);
+      verifyStats(namenode, fsn, retInfo, retNode, false);
     }
   }
-  
+
+  private DataNode getDataNode(DatanodeInfo decomInfo) {
+    DataNode decomNode = null;
+    for (DataNode dn: cluster.getDataNodes()) {
+      if (decomInfo.equals(dn.getDatanodeId())) {
+        decomNode = dn;
+        break;
+      }
+    }
+    assertNotNull("Could not find decomNode in cluster!", decomNode);
+    return decomNode;
+  }
+
   /**
    * Test host/include file functionality. Only datanodes
    * in the include file are allowed to connect to the namenode in a non
@@ -902,9 +946,9 @@ public class TestDecommission {
    * It is not recommended to use a registration name which is not also a
    * valid DNS hostname for the DataNode.  See HDFS-5237 for background.
    */
+  @Ignore
   @Test(timeout=360000)
-  public void testIncludeByRegistrationName() throws IOException,
-      InterruptedException {
+  public void testIncludeByRegistrationName() throws Exception {
     Configuration hdfsConf = new Configuration(conf);
     // Any IPv4 address starting with 127 functions as a "loopback" address
     // which is connected to the current host.  So by choosing 127.0.0.100
@@ -927,15 +971,22 @@ public class TestDecommission {
     refreshNodes(cluster.getNamesystem(0), hdfsConf);
 
     // Wait for the DN to be marked dead.
-    DFSClient client = getDfsClient(cluster.getNameNode(0), hdfsConf);
-    while (true) {
-      DatanodeInfo info[] = client.datanodeReport(DatanodeReportType.DEAD);
-      if (info.length == 1) {
-        break;
+    LOG.info("Waiting for DN to be marked as dead.");
+    final DFSClient client = getDfsClient(cluster.getNameNode(0), hdfsConf);
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        BlockManagerTestUtil
+            .checkHeartbeat(cluster.getNamesystem().getBlockManager());
+        try {
+          DatanodeInfo info[] = client.datanodeReport(DatanodeReportType.DEAD);
+          return info.length == 1;
+        } catch (IOException e) {
+          LOG.warn("Failed to check dead DNs", e);
+          return false;
+        }
       }
-      LOG.info("Waiting for datanode to be marked dead");
-      Thread.sleep(HEARTBEAT_INTERVAL * 1000);
-    }
+    }, 500, 5000);
 
     // Use a non-empty include file with our registration name.
     // It should work.
@@ -945,18 +996,169 @@ public class TestDecommission {
     writeConfigFile(hostsFile,  nodes);
     refreshNodes(cluster.getNamesystem(0), hdfsConf);
     cluster.restartDataNode(0);
+    cluster.triggerHeartbeats();
 
     // Wait for the DN to come back.
-    while (true) {
-      DatanodeInfo info[] = client.datanodeReport(DatanodeReportType.LIVE);
-      if (info.length == 1) {
-        Assert.assertFalse(info[0].isDecommissioned());
-        Assert.assertFalse(info[0].isDecommissionInProgress());
-        assertEquals(registrationName, info[0].getHostName());
-        break;
+    LOG.info("Waiting for DN to come back.");
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        BlockManagerTestUtil
+            .checkHeartbeat(cluster.getNamesystem().getBlockManager());
+        try {
+          DatanodeInfo info[] = client.datanodeReport(DatanodeReportType.LIVE);
+          if (info.length == 1) {
+            Assert.assertFalse(info[0].isDecommissioned());
+            Assert.assertFalse(info[0].isDecommissionInProgress());
+            assertEquals(registrationName, info[0].getHostName());
+            return true;
+          }
+        } catch (IOException e) {
+          LOG.warn("Failed to check dead DNs", e);
+        }
+        return false;
       }
-      LOG.info("Waiting for datanode to come back");
-      Thread.sleep(HEARTBEAT_INTERVAL * 1000);
+    }, 500, 5000);
+  }
+  
+  @Test(timeout=120000)
+  public void testBlocksPerInterval() throws Exception {
+    Configuration newConf = new Configuration(conf);
+    org.apache.log4j.Logger.getLogger(DecommissionManager.class)
+        .setLevel(Level.TRACE);
+    // Turn the blocks per interval way down
+    newConf.setInt(
+        DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_BLOCKS_PER_INTERVAL_KEY,
+        3);
+    // Disable the normal monitor runs
+    newConf.setInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY,
+        Integer.MAX_VALUE);
+    startCluster(1, 3, newConf);
+    final FileSystem fs = cluster.getFileSystem();
+    final DatanodeManager datanodeManager =
+        cluster.getNamesystem().getBlockManager().getDatanodeManager();
+    final DecommissionManager decomManager = datanodeManager.getDecomManager();
+
+    // Write a 3 block file, so each node has one block. Should scan 3 nodes.
+    DFSTestUtil.createFile(fs, new Path("/file1"), 64, (short) 3, 0xBAD1DEA);
+    doDecomCheck(datanodeManager, decomManager, 3);
+    // Write another file, should only scan two
+    DFSTestUtil.createFile(fs, new Path("/file2"), 64, (short)3, 0xBAD1DEA);
+    doDecomCheck(datanodeManager, decomManager, 2);
+    // One more file, should only scan 1
+    DFSTestUtil.createFile(fs, new Path("/file3"), 64, (short)3, 0xBAD1DEA);
+    doDecomCheck(datanodeManager, decomManager, 1);
+    // blocks on each DN now exceeds limit, still scan at least one node
+    DFSTestUtil.createFile(fs, new Path("/file4"), 64, (short)3, 0xBAD1DEA);
+    doDecomCheck(datanodeManager, decomManager, 1);
+  }
+
+  @Deprecated
+  @Test(timeout=120000)
+  public void testNodesPerInterval() throws Exception {
+    Configuration newConf = new Configuration(conf);
+    org.apache.log4j.Logger.getLogger(DecommissionManager.class)
+        .setLevel(Level.TRACE);
+    // Set the deprecated configuration key which limits the # of nodes per 
+    // interval
+    newConf.setInt("dfs.namenode.decommission.nodes.per.interval", 1);
+    // Disable the normal monitor runs
+    newConf.setInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY,
+        Integer.MAX_VALUE);
+    startCluster(1, 3, newConf);
+    final FileSystem fs = cluster.getFileSystem();
+    final DatanodeManager datanodeManager =
+        cluster.getNamesystem().getBlockManager().getDatanodeManager();
+    final DecommissionManager decomManager = datanodeManager.getDecomManager();
+
+    // Write a 3 block file, so each node has one block. Should scan 1 node 
+    // each time.
+    DFSTestUtil.createFile(fs, new Path("/file1"), 64, (short) 3, 0xBAD1DEA);
+    for (int i=0; i<3; i++) {
+      doDecomCheck(datanodeManager, decomManager, 1);
     }
   }
+
+  private void doDecomCheck(DatanodeManager datanodeManager,
+      DecommissionManager decomManager, int expectedNumCheckedNodes)
+      throws IOException, ExecutionException, InterruptedException {
+    // Decom all nodes
+    ArrayList<DatanodeInfo> decommissionedNodes = Lists.newArrayList();
+    for (DataNode d: cluster.getDataNodes()) {
+      DatanodeInfo dn = decommissionNode(0, d.getDatanodeUuid(),
+          decommissionedNodes,
+          AdminStates.DECOMMISSION_INPROGRESS);
+      decommissionedNodes.add(dn);
+    }
+    // Run decom scan and check
+    BlockManagerTestUtil.recheckDecommissionState(datanodeManager);
+    assertEquals("Unexpected # of nodes checked", expectedNumCheckedNodes, 
+        decomManager.getNumNodesChecked());
+    // Recommission all nodes
+    for (DatanodeInfo dn : decommissionedNodes) {
+      recommissionNode(0, dn);
+    }
+  }
+
+  @Test(timeout=120000)
+  public void testPendingNodes() throws Exception {
+    Configuration newConf = new Configuration(conf);
+    org.apache.log4j.Logger.getLogger(DecommissionManager.class)
+        .setLevel(Level.TRACE);
+    // Only allow one node to be decom'd at a time
+    newConf.setInt(
+        DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_MAX_CONCURRENT_TRACKED_NODES,
+        1);
+    // Disable the normal monitor runs
+    newConf.setInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY, 
+        Integer.MAX_VALUE);
+    startCluster(1, 3, newConf);
+    final FileSystem fs = cluster.getFileSystem();
+    final DatanodeManager datanodeManager =
+        cluster.getNamesystem().getBlockManager().getDatanodeManager();
+    final DecommissionManager decomManager = datanodeManager.getDecomManager();
+
+    // Keep a file open to prevent decom from progressing
+    HdfsDataOutputStream open1 =
+        (HdfsDataOutputStream) fs.create(new Path("/openFile1"), (short)3);
+    // Flush and trigger block reports so the block definitely shows up on NN
+    open1.write(123);
+    open1.hflush();
+    for (DataNode d: cluster.getDataNodes()) {
+      DataNodeTestUtils.triggerBlockReport(d);
+    }
+    // Decom two nodes, so one is still alive
+    ArrayList<DatanodeInfo> decommissionedNodes = Lists.newArrayList();
+    for (int i=0; i<2; i++) {
+      final DataNode d = cluster.getDataNodes().get(i);
+      DatanodeInfo dn = decommissionNode(0, d.getDatanodeUuid(), 
+          decommissionedNodes, 
+          AdminStates.DECOMMISSION_INPROGRESS);
+      decommissionedNodes.add(dn);
+    }
+
+    for (int i=2; i>=0; i--) {
+      assertTrackedAndPending(decomManager, 0, i);
+      BlockManagerTestUtil.recheckDecommissionState(datanodeManager);
+    }
+
+    // Close file, try to decom the last node, should get stuck in tracked
+    open1.close();
+    final DataNode d = cluster.getDataNodes().get(2);
+    DatanodeInfo dn = decommissionNode(0, d.getDatanodeUuid(),
+        decommissionedNodes,
+        AdminStates.DECOMMISSION_INPROGRESS);
+    decommissionedNodes.add(dn);
+    BlockManagerTestUtil.recheckDecommissionState(datanodeManager);
+    
+    assertTrackedAndPending(decomManager, 1, 0);
+  }
+
+  private void assertTrackedAndPending(DecommissionManager decomManager,
+      int tracked, int pending) {
+    assertEquals("Unexpected number of tracked nodes", tracked,
+        decomManager.getNumTrackedNodes());
+    assertEquals("Unexpected number of pending nodes", pending,
+        decomManager.getNumPendingNodes());
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ee0d32b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
index fccd308..f61176e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
 
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
@@ -300,9 +301,8 @@ public class BlockManagerTestUtil {
    * Have DatanodeManager check decommission state.
    * @param dm the DatanodeManager to manipulate
    */
-  public static void checkDecommissionState(DatanodeManager dm,
-      DatanodeDescriptor node) {
-    dm.checkDecommissionState(node);
+  public static void recheckDecommissionState(DatanodeManager dm)
+      throws ExecutionException, InterruptedException {
+    dm.getDecomManager().runMonitor();
   }
-
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ee0d32b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyConsiderLoad.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyConsiderLoad.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyConsiderLoad.java
index d9066e8..d514768 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyConsiderLoad.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyConsiderLoad.java
@@ -137,7 +137,7 @@ public class TestReplicationPolicyConsiderLoad {
       // returns false
       for (int i = 0; i < 3; i++) {
         DatanodeDescriptor d = dnManager.getDatanode(dnrList.get(i));
-        dnManager.startDecommission(d);
+        dnManager.getDecomManager().startDecommission(d);
         d.setDecommissioned();
       }
       assertEquals((double)load/3, dnManager.getFSClusterStats()

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ee0d32b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java
index a9aba86..789ee6f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java
@@ -18,7 +18,6 @@
 package org.apache.hadoop.hdfs.server.namenode;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
@@ -29,7 +28,6 @@ import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Random;
-import java.util.concurrent.TimeoutException;
 
 import org.apache.commons.io.output.ByteArrayOutputStream;
 import org.apache.hadoop.conf.Configuration;
@@ -53,7 +51,12 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.DecommissionManager;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.hdfs.tools.DFSAdmin;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -90,7 +93,8 @@ public class TestDecommissioningStatus {
     conf.set(DFSConfigKeys.DFS_HOSTS_EXCLUDE, excludeFile.toUri().getPath());
     Path includeFile = new Path(dir, "include");
     conf.set(DFSConfigKeys.DFS_HOSTS, includeFile.toUri().getPath());
-    conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 2000);
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 
+        1000);
     conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
     conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY,
         4);
@@ -104,6 +108,9 @@ public class TestDecommissioningStatus {
     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDatanodes).build();
     cluster.waitActive();
     fileSys = cluster.getFileSystem();
+    cluster.getNamesystem().getBlockManager().getDatanodeManager()
+        .setHeartbeatExpireInterval(3000);
+    Logger.getLogger(DecommissionManager.class).setLevel(Level.DEBUG);
   }
 
   @AfterClass
@@ -186,13 +193,16 @@ public class TestDecommissioningStatus {
   private void checkDecommissionStatus(DatanodeDescriptor decommNode,
       int expectedUnderRep, int expectedDecommissionOnly,
       int expectedUnderRepInOpenFiles) {
-    assertEquals(decommNode.decommissioningStatus.getUnderReplicatedBlocks(),
-        expectedUnderRep);
+    assertEquals("Unexpected num under-replicated blocks",
+        expectedUnderRep,
+        decommNode.decommissioningStatus.getUnderReplicatedBlocks());
+    assertEquals("Unexpected number of decom-only replicas",
+        expectedDecommissionOnly,
+        decommNode.decommissioningStatus.getDecommissionOnlyReplicas());
     assertEquals(
-        decommNode.decommissioningStatus.getDecommissionOnlyReplicas(),
-        expectedDecommissionOnly);
-    assertEquals(decommNode.decommissioningStatus
-        .getUnderReplicatedInOpenFiles(), expectedUnderRepInOpenFiles);
+        "Unexpected number of replicas in under-replicated open files",
+        expectedUnderRepInOpenFiles,
+        decommNode.decommissioningStatus.getUnderReplicatedInOpenFiles());
   }
 
   private void checkDFSAdminDecommissionStatus(
@@ -244,7 +254,7 @@ public class TestDecommissioningStatus {
    * Tests Decommissioning Status in DFS.
    */
   @Test
-  public void testDecommissionStatus() throws IOException, InterruptedException {
+  public void testDecommissionStatus() throws Exception {
     InetSocketAddress addr = new InetSocketAddress("localhost", cluster
         .getNameNodePort());
     DFSClient client = new DFSClient(addr, conf);
@@ -253,7 +263,7 @@ public class TestDecommissioningStatus {
     DistributedFileSystem fileSys = cluster.getFileSystem();
     DFSAdmin admin = new DFSAdmin(cluster.getConfiguration(0));
 
-    short replicas = 2;
+    short replicas = numDatanodes;
     //
     // Decommission one node. Verify the decommission status
     // 
@@ -263,7 +273,9 @@ public class TestDecommissioningStatus {
 
     Path file2 = new Path("decommission1.dat");
     FSDataOutputStream st1 = writeIncompleteFile(fileSys, file2, replicas);
-    Thread.sleep(5000);
+    for (DataNode d: cluster.getDataNodes()) {
+      DataNodeTestUtils.triggerBlockReport(d);
+    }
 
     FSNamesystem fsn = cluster.getNamesystem();
     final DatanodeManager dm = fsn.getBlockManager().getDatanodeManager();
@@ -271,19 +283,22 @@ public class TestDecommissioningStatus {
       String downnode = decommissionNode(fsn, client, localFileSys, iteration);
       dm.refreshNodes(conf);
       decommissionedNodes.add(downnode);
-      Thread.sleep(5000);
+      BlockManagerTestUtil.recheckDecommissionState(dm);
       final List<DatanodeDescriptor> decommissioningNodes = dm.getDecommissioningNodes();
       if (iteration == 0) {
         assertEquals(decommissioningNodes.size(), 1);
         DatanodeDescriptor decommNode = decommissioningNodes.get(0);
-        checkDecommissionStatus(decommNode, 4, 0, 2);
+        checkDecommissionStatus(decommNode, 3, 0, 1);
         checkDFSAdminDecommissionStatus(decommissioningNodes.subList(0, 1),
             fileSys, admin);
       } else {
         assertEquals(decommissioningNodes.size(), 2);
         DatanodeDescriptor decommNode1 = decommissioningNodes.get(0);
         DatanodeDescriptor decommNode2 = decommissioningNodes.get(1);
-        checkDecommissionStatus(decommNode1, 4, 4, 2);
+        // This one is still 3,3,1 since it passed over the UC block 
+        // earlier, before node 2 was decommed
+        checkDecommissionStatus(decommNode1, 3, 3, 1);
+        // This one is 4,4,2 since it has the full state
         checkDecommissionStatus(decommNode2, 4, 4, 2);
         checkDFSAdminDecommissionStatus(decommissioningNodes.subList(0, 2),
             fileSys, admin);
@@ -305,8 +320,7 @@ public class TestDecommissioningStatus {
    * the replication process after it rejoins the cluster.
    */
   @Test(timeout=120000)
-  public void testDecommissionStatusAfterDNRestart()
-      throws IOException, InterruptedException {
+  public void testDecommissionStatusAfterDNRestart() throws Exception {
     DistributedFileSystem fileSys =
         (DistributedFileSystem)cluster.getFileSystem();
 
@@ -345,7 +359,7 @@ public class TestDecommissioningStatus {
     BlockManagerTestUtil.checkHeartbeat(fsn.getBlockManager());
 
     // Force DatanodeManager to check decommission state.
-    BlockManagerTestUtil.checkDecommissionState(dm, dead.get(0));
+    BlockManagerTestUtil.recheckDecommissionState(dm);
 
     // Verify that the DN remains in DECOMMISSION_INPROGRESS state.
     assertTrue("the node should be DECOMMISSION_IN_PROGRESSS",
@@ -359,7 +373,7 @@ public class TestDecommissioningStatus {
     // Delete the under-replicated file, which should let the 
     // DECOMMISSION_IN_PROGRESS node become DECOMMISSIONED
     cleanupFile(fileSys, f);
-    BlockManagerTestUtil.checkDecommissionState(dm, dead.get(0));
+    BlockManagerTestUtil.recheckDecommissionState(dm);
     assertTrue("the node should be decommissioned",
         dead.get(0).isDecommissioned());
 
@@ -380,8 +394,9 @@ public class TestDecommissioningStatus {
    * DECOMMISSIONED
    */
   @Test(timeout=120000)
-  public void testDecommissionDeadDN()
-      throws IOException, InterruptedException, TimeoutException {
+  public void testDecommissionDeadDN() throws Exception {
+    Logger log = Logger.getLogger(DecommissionManager.class);
+    log.setLevel(Level.DEBUG);
     DatanodeID dnID = cluster.getDataNodes().get(0).getDatanodeId();
     String dnName = dnID.getXferAddr();
     DataNodeProperties stoppedDN = cluster.stopDataNode(0);
@@ -392,7 +407,7 @@ public class TestDecommissioningStatus {
     DatanodeDescriptor dnDescriptor = dm.getDatanode(dnID);
     decommissionNode(fsn, localFileSys, dnName);
     dm.refreshNodes(conf);
-    BlockManagerTestUtil.checkDecommissionState(dm, dnDescriptor);
+    BlockManagerTestUtil.recheckDecommissionState(dm);
     assertTrue(dnDescriptor.isDecommissioned());
 
     // Add the node back

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ee0d32b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java
index 409fffc..70deb1b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java
@@ -1305,7 +1305,7 @@ public class TestFsck {
           .getBlockManager().getBlockCollection(eb.getLocalBlock())
           .getBlocks()[0].getDatanode(0);
       cluster.getNameNode().getNamesystem().getBlockManager()
-          .getDatanodeManager().startDecommission(dn);
+          .getDatanodeManager().getDecomManager().startDecommission(dn);
       String dnName = dn.getXferAddr();
 
       //wait for decommission start

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ee0d32b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeCapacityReport.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeCapacityReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeCapacityReport.java
index 426563b..35a611b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeCapacityReport.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeCapacityReport.java
@@ -30,8 +30,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.DF;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSOutputStream;
@@ -240,7 +238,7 @@ public class TestNamenodeCapacityReport {
         DatanodeDescriptor dnd =
             dnm.getDatanode(datanodes.get(i).getDatanodeId());
         expectedInServiceLoad -= dnd.getXceiverCount();
-        dnm.startDecommission(dnd);
+        dnm.getDecomManager().startDecommission(dnd);
         DataNodeTestUtils.triggerHeartbeat(datanodes.get(i));
         Thread.sleep(100);
         checkClusterHealth(nodes, namesystem, expectedTotalLoad, expectedInServiceNodes, expectedInServiceLoad);