You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by su...@apache.org on 2012/10/11 20:08:29 UTC

svn commit: r1397211 - in /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ src/main/java/org/apache/hadoop/hdfs/server/datanode/ src/main/java/or...

Author: suresh
Date: Thu Oct 11 18:08:27 2012
New Revision: 1397211

URL: http://svn.apache.org/viewvc?rev=1397211&view=rev
Log:
HDFS-3912. Detect and avoid stale datanodes for writes. Contributed by Jing Zhao

Modified:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1397211&r1=1397210&r2=1397211&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Thu Oct 11 18:08:27 2012
@@ -13,10 +13,6 @@ Trunk (Unreleased)
     HDFS-3601. Add BlockPlacementPolicyWithNodeGroup to support block placement
     with 4-layer network topology.  (Junping Du via szetszwo)
 
-    HDFS-3703. Datanodes are marked stale if heartbeat is not received in
-    configured timeout and are selected as the last location to read from.
-    (Jing Zhao via suresh)
-
   IMPROVEMENTS
 
     HDFS-1620. Rename HdfsConstants -> HdfsServerConstants, FSConstants ->
@@ -238,6 +234,9 @@ Release 2.0.3-alpha - Unreleased 
     HDFS-2656. Add libwebhdfs, a pure C client based on WebHDFS.
     (Jaimin D Jetly and Jing Zhao via szetszwo)
 
+    HDFS-3912. Detect and avoid stale datanodes for writes.
+    (Jing Zhao via suresh)
+
   IMPROVEMENTS
   
     HDFS-3925. Prettify PipelineAck#toString() for printing to a log
@@ -349,6 +348,11 @@ Release 2.0.2-alpha - 2012-09-07 
 
     HDFS-2793. Add an admin command to trigger an edit log roll. (todd)
 
+    HDFS-3703. Datanodes are marked stale if heartbeat is not received in
+    configured timeout and are selected as the last location to read from.
+    (Jing Zhao via suresh)
+    
+
   IMPROVEMENTS
 
     HDFS-3390. DFSAdmin should print full stack traces of errors when DEBUG

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1397211&r1=1397210&r2=1397211&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Thu Oct 11 18:08:27 2012
@@ -180,9 +180,21 @@ public class DFSConfigKeys extends Commo
   // Whether to enable datanode's stale state detection and usage
   public static final String DFS_NAMENODE_CHECK_STALE_DATANODE_KEY = "dfs.namenode.check.stale.datanode";
   public static final boolean DFS_NAMENODE_CHECK_STALE_DATANODE_DEFAULT = false;
+  // Whether to enable datanode's stale state detection and usage
+  public static final String DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY = "dfs.namenode.avoid.write.stale.datanode";
+  public static final boolean DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_DEFAULT = false;
   // The default value of the time interval for marking datanodes as stale
   public static final String DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY = "dfs.namenode.stale.datanode.interval";
-  public static final long DFS_NAMENODE_STALE_DATANODE_INTERVAL_MILLI_DEFAULT = 30 * 1000; // 30s
+  public static final long DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT = 30 * 1000; // 30s
+  // The stale interval cannot be too small since otherwise this may cause too frequent churn on stale states. 
+  // This value uses the times of heartbeat interval to define the minimum value for stale interval.  
+  public static final String DFS_NAMENODE_STALE_DATANODE_MINIMUM_INTERVAL_KEY = "dfs.namenode.stale.datanode.minimum.interval";
+  public static final int DFS_NAMENODE_STALE_DATANODE_MINIMUM_INTERVAL_DEFAULT = 3; // i.e. min_interval is 3 * heartbeat_interval = 9s
+  
+  // When the number stale datanodes marked as stale reached this certian ratio, 
+  // stop avoiding writing to stale nodes so as to prevent causing hotspots.
+  public static final String DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_KEY = "dfs.namenode.write.stale.datanode.ratio";
+  public static final float DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_DEFAULT = 0.5f;
 
   // Replication monitoring related keys
   public static final String DFS_NAMENODE_INVALIDATE_WORK_PCT_PER_ITERATION =

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java?rev=1397211&r1=1397210&r2=1397211&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java Thu Oct 11 18:08:27 2012
@@ -62,6 +62,8 @@ public class BlockPlacementPolicyDefault
   protected NetworkTopology clusterMap;
   private FSClusterStats stats;
   protected long heartbeatInterval;   // interval for DataNode heartbeats
+  private long staleInterval;   // interval used to identify stale DataNodes
+  
   /**
    * A miss of that many heartbeats is tolerated for replica deletion policy.
    */
@@ -78,7 +80,8 @@ public class BlockPlacementPolicyDefault
   @Override
   public void initialize(Configuration conf,  FSClusterStats stats,
                          NetworkTopology clusterMap) {
-    this.considerLoad = conf.getBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, true);
+    this.considerLoad = conf.getBoolean(
+        DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, true);
     this.stats = stats;
     this.clusterMap = clusterMap;
     this.heartbeatInterval = conf.getLong(
@@ -87,6 +90,9 @@ public class BlockPlacementPolicyDefault
     this.tolerateHeartbeatMultiplier = conf.getInt(
         DFSConfigKeys.DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_KEY,
         DFSConfigKeys.DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_DEFAULT);
+    this.staleInterval = conf.getLong(
+        DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY, 
+        DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT);
   }
 
   protected ThreadLocal<StringBuilder> threadLocalBuilder =
@@ -155,9 +161,10 @@ public class BlockPlacementPolicyDefault
       writer=null;
     }
       
-    DatanodeDescriptor localNode = chooseTarget(numOfReplicas, writer, 
-                                                excludedNodes, blocksize, 
-                                                maxNodesPerRack, results);
+    boolean avoidStaleNodes = (stats != null
+        && stats.isAvoidingStaleDataNodesForWrite());
+    DatanodeDescriptor localNode = chooseTarget(numOfReplicas, writer,
+        excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes);
     if (!returnChosenNodes) {  
       results.removeAll(chosenNodes);
     }
@@ -173,8 +180,8 @@ public class BlockPlacementPolicyDefault
                                           HashMap<Node, Node> excludedNodes,
                                           long blocksize,
                                           int maxNodesPerRack,
-                                          List<DatanodeDescriptor> results) {
-      
+                                          List<DatanodeDescriptor> results,
+                                          final boolean avoidStaleNodes) {
     if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) {
       return writer;
     }
@@ -185,18 +192,21 @@ public class BlockPlacementPolicyDefault
     if (writer == null && !newBlock) {
       writer = results.get(0);
     }
-      
+
+    // Keep a copy of original excludedNodes
+    final HashMap<Node, Node> oldExcludedNodes = avoidStaleNodes ? 
+        new HashMap<Node, Node>(excludedNodes) : null;
     try {
       if (numOfResults == 0) {
-        writer = chooseLocalNode(writer, excludedNodes, 
-                                 blocksize, maxNodesPerRack, results);
+        writer = chooseLocalNode(writer, excludedNodes, blocksize,
+            maxNodesPerRack, results, avoidStaleNodes);
         if (--numOfReplicas == 0) {
           return writer;
         }
       }
       if (numOfResults <= 1) {
-        chooseRemoteRack(1, results.get(0), excludedNodes, 
-                         blocksize, maxNodesPerRack, results);
+        chooseRemoteRack(1, results.get(0), excludedNodes, blocksize,
+            maxNodesPerRack, results, avoidStaleNodes);
         if (--numOfReplicas == 0) {
           return writer;
         }
@@ -204,24 +214,36 @@ public class BlockPlacementPolicyDefault
       if (numOfResults <= 2) {
         if (clusterMap.isOnSameRack(results.get(0), results.get(1))) {
           chooseRemoteRack(1, results.get(0), excludedNodes,
-                           blocksize, maxNodesPerRack, results);
+                           blocksize, maxNodesPerRack, 
+                           results, avoidStaleNodes);
         } else if (newBlock){
           chooseLocalRack(results.get(1), excludedNodes, blocksize, 
-                          maxNodesPerRack, results);
+                          maxNodesPerRack, results, avoidStaleNodes);
         } else {
-          chooseLocalRack(writer, excludedNodes, blocksize,
-                          maxNodesPerRack, results);
+          chooseLocalRack(writer, excludedNodes, blocksize, maxNodesPerRack,
+              results, avoidStaleNodes);
         }
         if (--numOfReplicas == 0) {
           return writer;
         }
       }
-      chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, 
-                   blocksize, maxNodesPerRack, results);
+      chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, blocksize,
+          maxNodesPerRack, results, avoidStaleNodes);
     } catch (NotEnoughReplicasException e) {
       LOG.warn("Not able to place enough replicas, still in need of "
                + numOfReplicas + " to reach " + totalReplicasExpected + "\n"
                + e.getMessage());
+      if (avoidStaleNodes) {
+        // ecxludedNodes now has - initial excludedNodes, any nodes that were
+        // chosen and nodes that were tried but were not chosen because they
+        // were stale, decommissioned or for any other reason a node is not
+        // chosen for write. Retry again now not avoiding stale node
+        for (Node node : results) {
+          oldExcludedNodes.put(node, node);
+        }
+        return chooseTarget(numOfReplicas, writer, oldExcludedNodes, blocksize,
+            maxNodesPerRack, results, false);
+      }
     }
     return writer;
   }
@@ -236,26 +258,27 @@ public class BlockPlacementPolicyDefault
                                              HashMap<Node, Node> excludedNodes,
                                              long blocksize,
                                              int maxNodesPerRack,
-                                             List<DatanodeDescriptor> results)
+                                             List<DatanodeDescriptor> results,
+                                             boolean avoidStaleNodes)
     throws NotEnoughReplicasException {
     // if no local machine, randomly choose one node
     if (localMachine == null)
-      return chooseRandom(NodeBase.ROOT, excludedNodes, 
-                          blocksize, maxNodesPerRack, results);
+      return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
+          maxNodesPerRack, results, avoidStaleNodes);
     if (preferLocalNode) {
       // otherwise try local machine first
       Node oldNode = excludedNodes.put(localMachine, localMachine);
       if (oldNode == null) { // was not in the excluded list
-        if (isGoodTarget(localMachine, blocksize,
-                         maxNodesPerRack, false, results)) {
+        if (isGoodTarget(localMachine, blocksize, maxNodesPerRack, false,
+            results, avoidStaleNodes)) {
           results.add(localMachine);
           return localMachine;
         }
       } 
     }      
     // try a node on local rack
-    return chooseLocalRack(localMachine, excludedNodes, 
-                           blocksize, maxNodesPerRack, results);
+    return chooseLocalRack(localMachine, excludedNodes, blocksize,
+        maxNodesPerRack, results, avoidStaleNodes);
   }
     
   /* choose one node from the rack that <i>localMachine</i> is on.
@@ -270,19 +293,19 @@ public class BlockPlacementPolicyDefault
                                              HashMap<Node, Node> excludedNodes,
                                              long blocksize,
                                              int maxNodesPerRack,
-                                             List<DatanodeDescriptor> results)
+                                             List<DatanodeDescriptor> results,
+                                             boolean avoidStaleNodes)
     throws NotEnoughReplicasException {
     // no local machine, so choose a random machine
     if (localMachine == null) {
-      return chooseRandom(NodeBase.ROOT, excludedNodes, 
-                          blocksize, maxNodesPerRack, results);
+      return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
+          maxNodesPerRack, results, avoidStaleNodes);
     }
       
     // choose one from the local rack
     try {
-      return chooseRandom(
-                          localMachine.getNetworkLocation(),
-                          excludedNodes, blocksize, maxNodesPerRack, results);
+      return chooseRandom(localMachine.getNetworkLocation(), excludedNodes,
+          blocksize, maxNodesPerRack, results, avoidStaleNodes);
     } catch (NotEnoughReplicasException e1) {
       // find the second replica
       DatanodeDescriptor newLocal=null;
@@ -296,18 +319,17 @@ public class BlockPlacementPolicyDefault
       }
       if (newLocal != null) {
         try {
-          return chooseRandom(
-                              newLocal.getNetworkLocation(),
-                              excludedNodes, blocksize, maxNodesPerRack, results);
+          return chooseRandom(newLocal.getNetworkLocation(), excludedNodes,
+              blocksize, maxNodesPerRack, results, avoidStaleNodes);
         } catch(NotEnoughReplicasException e2) {
           //otherwise randomly choose one from the network
-          return chooseRandom(NodeBase.ROOT, excludedNodes,
-                              blocksize, maxNodesPerRack, results);
+          return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
+              maxNodesPerRack, results, avoidStaleNodes);
         }
       } else {
         //otherwise randomly choose one from the network
-        return chooseRandom(NodeBase.ROOT, excludedNodes,
-                            blocksize, maxNodesPerRack, results);
+        return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
+            maxNodesPerRack, results, avoidStaleNodes);
       }
     }
   }
@@ -323,17 +345,19 @@ public class BlockPlacementPolicyDefault
                                 HashMap<Node, Node> excludedNodes,
                                 long blocksize,
                                 int maxReplicasPerRack,
-                                List<DatanodeDescriptor> results)
+                                List<DatanodeDescriptor> results,
+                                boolean avoidStaleNodes)
     throws NotEnoughReplicasException {
     int oldNumOfReplicas = results.size();
     // randomly choose one node from remote racks
     try {
-      chooseRandom(numOfReplicas, "~"+localMachine.getNetworkLocation(),
-                   excludedNodes, blocksize, maxReplicasPerRack, results);
+      chooseRandom(numOfReplicas, "~" + localMachine.getNetworkLocation(),
+          excludedNodes, blocksize, maxReplicasPerRack, results,
+          avoidStaleNodes);
     } catch (NotEnoughReplicasException e) {
       chooseRandom(numOfReplicas-(results.size()-oldNumOfReplicas),
                    localMachine.getNetworkLocation(), excludedNodes, blocksize, 
-                   maxReplicasPerRack, results);
+                   maxReplicasPerRack, results, avoidStaleNodes);
     }
   }
 
@@ -345,7 +369,8 @@ public class BlockPlacementPolicyDefault
                                           HashMap<Node, Node> excludedNodes,
                                           long blocksize,
                                           int maxNodesPerRack,
-                                          List<DatanodeDescriptor> results) 
+                                          List<DatanodeDescriptor> results,
+                                          boolean avoidStaleNodes) 
     throws NotEnoughReplicasException {
     int numOfAvailableNodes =
       clusterMap.countNumOfAvailableNodes(nodes, excludedNodes.keySet());
@@ -363,7 +388,8 @@ public class BlockPlacementPolicyDefault
       Node oldNode = excludedNodes.put(chosenNode, chosenNode);
       if (oldNode == null) { // chosenNode was not in the excluded list
         numOfAvailableNodes--;
-        if (isGoodTarget(chosenNode, blocksize, maxNodesPerRack, results)) {
+        if (isGoodTarget(chosenNode, blocksize, 
+                maxNodesPerRack, results, avoidStaleNodes)) {
           results.add(chosenNode);
           adjustExcludedNodes(excludedNodes, chosenNode);
           return chosenNode;
@@ -390,7 +416,8 @@ public class BlockPlacementPolicyDefault
                             HashMap<Node, Node> excludedNodes,
                             long blocksize,
                             int maxNodesPerRack,
-                            List<DatanodeDescriptor> results)
+                            List<DatanodeDescriptor> results,
+                            boolean avoidStaleNodes)
     throws NotEnoughReplicasException {
       
     int numOfAvailableNodes =
@@ -409,7 +436,8 @@ public class BlockPlacementPolicyDefault
       if (oldNode == null) {
         numOfAvailableNodes--;
 
-        if (isGoodTarget(chosenNode, blocksize, maxNodesPerRack, results)) {
+        if (isGoodTarget(chosenNode, blocksize, 
+              maxNodesPerRack, results, avoidStaleNodes)) {
           numOfReplicas--;
           results.add(chosenNode);
           adjustExcludedNodes(excludedNodes, chosenNode);
@@ -451,9 +479,10 @@ public class BlockPlacementPolicyDefault
    */
   private boolean isGoodTarget(DatanodeDescriptor node,
                                long blockSize, int maxTargetPerRack,
-                               List<DatanodeDescriptor> results) {
-    return isGoodTarget(node, blockSize, maxTargetPerRack,
-                        this.considerLoad, results);
+                               List<DatanodeDescriptor> results, 
+                               boolean avoidStaleNodes) {
+    return isGoodTarget(node, blockSize, maxTargetPerRack, this.considerLoad,
+        results, avoidStaleNodes);
   }
   
   /**
@@ -466,7 +495,8 @@ public class BlockPlacementPolicyDefault
    *                       the cluster and total number of replicas for a block
    * @param considerLoad whether or not to consider load of the target node
    * @param results A list containing currently chosen nodes. Used to check if 
-   *                too many nodes has been chosen in the target rack. 
+   *                too many nodes has been chosen in the target rack.
+   * @param avoidStaleNodes Whether or not to avoid choosing stale nodes
    * @return Return true if <i>node</i> has enough space, 
    *         does not have too much load, 
    *         and the rack does not have too many nodes.
@@ -474,7 +504,8 @@ public class BlockPlacementPolicyDefault
   protected boolean isGoodTarget(DatanodeDescriptor node,
                                long blockSize, int maxTargetPerRack,
                                boolean considerLoad,
-                               List<DatanodeDescriptor> results) {
+                               List<DatanodeDescriptor> results,                           
+                               boolean avoidStaleNodes) {
     // check if the node is (being) decommissed
     if (node.isDecommissionInProgress() || node.isDecommissioned()) {
       if(LOG.isDebugEnabled()) {
@@ -485,6 +516,17 @@ public class BlockPlacementPolicyDefault
       return false;
     }
 
+    if (avoidStaleNodes) {
+      if (node.isStale(this.staleInterval)) {
+        if (LOG.isDebugEnabled()) {
+          threadLocalBuilder.get().append(node.toString()).append(": ")
+              .append("Node ").append(NodeBase.getPath(node))
+              .append(" is not chosen because the node is staled ");
+        }
+        return false;
+      }
+    }
+    
     long remaining = node.getRemaining() - 
                      (node.getBlocksScheduled() * blockSize); 
     // check the remaining capacity of the target machine

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java?rev=1397211&r1=1397210&r2=1397211&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java Thu Oct 11 18:08:27 2012
@@ -64,23 +64,20 @@ public class BlockPlacementPolicyWithNod
    * @return the chosen node
    */
   @Override
-  protected DatanodeDescriptor chooseLocalNode(
-      DatanodeDescriptor localMachine,
-      HashMap<Node, Node> excludedNodes,
-      long blocksize,
-      int maxNodesPerRack,
-      List<DatanodeDescriptor> results)
+  protected DatanodeDescriptor chooseLocalNode(DatanodeDescriptor localMachine,
+      HashMap<Node, Node> excludedNodes, long blocksize, int maxNodesPerRack,
+      List<DatanodeDescriptor> results, boolean avoidStaleNodes)
         throws NotEnoughReplicasException {
     // if no local machine, randomly choose one node
     if (localMachine == null)
       return chooseRandom(NodeBase.ROOT, excludedNodes, 
-          blocksize, maxNodesPerRack, results);
+          blocksize, maxNodesPerRack, results, avoidStaleNodes);
 
     // otherwise try local machine first
     Node oldNode = excludedNodes.put(localMachine, localMachine);
     if (oldNode == null) { // was not in the excluded list
       if (isGoodTarget(localMachine, blocksize,
-          maxNodesPerRack, false, results)) {
+          maxNodesPerRack, false, results, avoidStaleNodes)) {
         results.add(localMachine);
         // Nodes under same nodegroup should be excluded.
         addNodeGroupToExcludedNodes(excludedNodes,
@@ -92,13 +89,13 @@ public class BlockPlacementPolicyWithNod
     // try a node on local node group
     DatanodeDescriptor chosenNode = chooseLocalNodeGroup(
         (NetworkTopologyWithNodeGroup)clusterMap, localMachine, excludedNodes, 
-        blocksize, maxNodesPerRack, results);
+        blocksize, maxNodesPerRack, results, avoidStaleNodes);
     if (chosenNode != null) {
       return chosenNode;
     }
     // try a node on local rack
     return chooseLocalRack(localMachine, excludedNodes, 
-        blocksize, maxNodesPerRack, results);
+        blocksize, maxNodesPerRack, results, avoidStaleNodes);
   }
 
   @Override
@@ -119,17 +116,15 @@ public class BlockPlacementPolicyWithNod
   }
 
   @Override
-  protected DatanodeDescriptor chooseLocalRack(
-                                             DatanodeDescriptor localMachine,
-                                             HashMap<Node, Node> excludedNodes,
-                                             long blocksize,
-                                             int maxNodesPerRack,
-                                             List<DatanodeDescriptor> results)
-    throws NotEnoughReplicasException {
+  protected DatanodeDescriptor chooseLocalRack(DatanodeDescriptor localMachine,
+      HashMap<Node, Node> excludedNodes, long blocksize, int maxNodesPerRack,
+      List<DatanodeDescriptor> results, boolean avoidStaleNodes)
+      throws NotEnoughReplicasException {
     // no local machine, so choose a random machine
     if (localMachine == null) {
       return chooseRandom(NodeBase.ROOT, excludedNodes, 
-                          blocksize, maxNodesPerRack, results);
+                          blocksize, maxNodesPerRack, results, 
+                          avoidStaleNodes);
     }
 
     // choose one from the local rack, but off-nodegroup
@@ -137,7 +132,8 @@ public class BlockPlacementPolicyWithNod
       return chooseRandom(NetworkTopology.getFirstHalf(
                               localMachine.getNetworkLocation()),
                           excludedNodes, blocksize, 
-                          maxNodesPerRack, results);
+                          maxNodesPerRack, results, 
+                          avoidStaleNodes);
     } catch (NotEnoughReplicasException e1) {
       // find the second replica
       DatanodeDescriptor newLocal=null;
@@ -151,39 +147,39 @@ public class BlockPlacementPolicyWithNod
       }
       if (newLocal != null) {
         try {
-          return chooseRandom(clusterMap.getRack(newLocal.getNetworkLocation()),
-                              excludedNodes, blocksize, maxNodesPerRack, results);
+          return chooseRandom(
+              clusterMap.getRack(newLocal.getNetworkLocation()), excludedNodes,
+              blocksize, maxNodesPerRack, results, avoidStaleNodes);
         } catch(NotEnoughReplicasException e2) {
           //otherwise randomly choose one from the network
-          return chooseRandom(NodeBase.ROOT, excludedNodes,
-                              blocksize, maxNodesPerRack, results);
+          return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
+              maxNodesPerRack, results, avoidStaleNodes);
         }
       } else {
         //otherwise randomly choose one from the network
-        return chooseRandom(NodeBase.ROOT, excludedNodes,
-                            blocksize, maxNodesPerRack, results);
+        return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
+            maxNodesPerRack, results, avoidStaleNodes);
       }
     }
   }
 
   @Override
   protected void chooseRemoteRack(int numOfReplicas,
-          DatanodeDescriptor localMachine,
-          HashMap<Node, Node> excludedNodes,
-          long blocksize,
-          int maxReplicasPerRack,
-          List<DatanodeDescriptor> results)
-          throws NotEnoughReplicasException {
+      DatanodeDescriptor localMachine, HashMap<Node, Node> excludedNodes,
+      long blocksize, int maxReplicasPerRack, List<DatanodeDescriptor> results,
+      boolean avoidStaleNodes) throws NotEnoughReplicasException {
     int oldNumOfReplicas = results.size();
     // randomly choose one node from remote racks
     try {
-      chooseRandom(numOfReplicas, "~"+NetworkTopology.getFirstHalf(
-          localMachine.getNetworkLocation()),
-      excludedNodes, blocksize, maxReplicasPerRack, results);
+      chooseRandom(
+          numOfReplicas,
+          "~" + NetworkTopology.getFirstHalf(localMachine.getNetworkLocation()),
+          excludedNodes, blocksize, maxReplicasPerRack, results,
+          avoidStaleNodes);
     } catch (NotEnoughReplicasException e) {
-      chooseRandom(numOfReplicas-(results.size()-oldNumOfReplicas),
-      localMachine.getNetworkLocation(), excludedNodes, blocksize, 
-      maxReplicasPerRack, results);
+      chooseRandom(numOfReplicas - (results.size() - oldNumOfReplicas),
+          localMachine.getNetworkLocation(), excludedNodes, blocksize,
+          maxReplicasPerRack, results, avoidStaleNodes);
     }
   }
 
@@ -193,19 +189,22 @@ public class BlockPlacementPolicyWithNod
    * if still no such node is available, choose a random node in the cluster.
    * @return the chosen node
    */
-  private DatanodeDescriptor chooseLocalNodeGroup(NetworkTopologyWithNodeGroup clusterMap,
-      DatanodeDescriptor localMachine, HashMap<Node, Node> excludedNodes, long blocksize, 
-      int maxNodesPerRack, List<DatanodeDescriptor> results) throws NotEnoughReplicasException {
+  private DatanodeDescriptor chooseLocalNodeGroup(
+      NetworkTopologyWithNodeGroup clusterMap, DatanodeDescriptor localMachine,
+      HashMap<Node, Node> excludedNodes, long blocksize, int maxNodesPerRack,
+      List<DatanodeDescriptor> results, boolean avoidStaleNodes)
+      throws NotEnoughReplicasException {
     // no local machine, so choose a random machine
     if (localMachine == null) {
       return chooseRandom(NodeBase.ROOT, excludedNodes, 
-      blocksize, maxNodesPerRack, results);
+      blocksize, maxNodesPerRack, results, avoidStaleNodes);
     }
 
     // choose one from the local node group
     try {
-      return chooseRandom(clusterMap.getNodeGroup(localMachine.getNetworkLocation()),
-      excludedNodes, blocksize, maxNodesPerRack, results);
+      return chooseRandom(
+          clusterMap.getNodeGroup(localMachine.getNetworkLocation()),
+          excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes);
     } catch (NotEnoughReplicasException e1) {
       // find the second replica
       DatanodeDescriptor newLocal=null;
@@ -219,17 +218,19 @@ public class BlockPlacementPolicyWithNod
       }
       if (newLocal != null) {
         try {
-          return chooseRandom(clusterMap.getNodeGroup(newLocal.getNetworkLocation()),
-            excludedNodes, blocksize, maxNodesPerRack, results);
+          return chooseRandom(
+              clusterMap.getNodeGroup(newLocal.getNetworkLocation()),
+              excludedNodes, blocksize, maxNodesPerRack, results,
+              avoidStaleNodes);
         } catch(NotEnoughReplicasException e2) {
           //otherwise randomly choose one from the network
-          return chooseRandom(NodeBase.ROOT, excludedNodes,
-              blocksize, maxNodesPerRack, results);
+          return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
+              maxNodesPerRack, results, avoidStaleNodes);
         }
       } else {
         //otherwise randomly choose one from the network
-        return chooseRandom(NodeBase.ROOT, excludedNodes,
-            blocksize, maxNodesPerRack, results);
+        return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
+            maxNodesPerRack, results, avoidStaleNodes);
       }
     }
   }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java?rev=1397211&r1=1397210&r2=1397211&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java Thu Oct 11 18:08:27 2012
@@ -76,6 +76,7 @@ import org.apache.hadoop.util.Reflection
 import org.apache.hadoop.util.Time;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import com.google.common.net.InetAddresses;
 
 /**
@@ -88,8 +89,8 @@ public class DatanodeManager {
 
   private final Namesystem namesystem;
   private final BlockManager blockManager;
-
   private final HeartbeatManager heartbeatManager;
+  private Daemon decommissionthread = null;
 
   /**
    * Stores the datanode -> block map.  
@@ -127,28 +128,33 @@ public class DatanodeManager {
   /** Ask Datanode only up to this many blocks to delete. */
   final int blockInvalidateLimit;
   
+  /** Whether or not to check stale DataNodes for read/write */
+  private final boolean checkForStaleDataNodes;
+
+  /** The interval for judging stale DataNodes for read/write */
+  private final long staleInterval;
+  
+  /** Whether or not to avoid using stale DataNodes for writing */
+  private volatile boolean avoidStaleDataNodesForWrite;
+  
+  /** The number of stale DataNodes */
+  private volatile int numStaleNodes;
+  
   /**
    * Whether or not this cluster has ever consisted of more than 1 rack,
    * according to the NetworkTopology.
    */
   private boolean hasClusterEverBeenMultiRack = false;
   
-  /** Whether or not to check the stale datanodes */
-  private volatile boolean checkForStaleNodes;
-  /** The time interval for detecting stale datanodes */
-  private volatile long staleInterval;
-  
-  DatanodeManager(final BlockManager blockManager,
-      final Namesystem namesystem, final Configuration conf
-      ) throws IOException {
+  DatanodeManager(final BlockManager blockManager, final Namesystem namesystem,
+      final Configuration conf) throws IOException {
     this.namesystem = namesystem;
     this.blockManager = blockManager;
     
     Class<? extends NetworkTopology> networkTopologyClass =
         conf.getClass(CommonConfigurationKeysPublic.NET_TOPOLOGY_IMPL_KEY,
             NetworkTopology.class, NetworkTopology.class);
-    networktopology = (NetworkTopology) ReflectionUtils.newInstance(
-        networkTopologyClass, conf);
+    networktopology = ReflectionUtils.newInstance(networkTopologyClass, conf);
 
     this.heartbeatManager = new HeartbeatManager(namesystem, blockManager, conf);
 
@@ -181,25 +187,69 @@ public class DatanodeManager {
         DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_KEY, blockInvalidateLimit);
     LOG.info(DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_KEY
         + "=" + this.blockInvalidateLimit);
-    // set the value of stale interval based on configuration
-    this.checkForStaleNodes = conf.getBoolean(
+    
+    checkForStaleDataNodes = conf.getBoolean(
         DFSConfigKeys.DFS_NAMENODE_CHECK_STALE_DATANODE_KEY,
         DFSConfigKeys.DFS_NAMENODE_CHECK_STALE_DATANODE_DEFAULT);
-    if (this.checkForStaleNodes) {
-      this.staleInterval = conf.getLong(
-          DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY,
-          DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_MILLI_DEFAULT);
-      if (this.staleInterval < DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_MILLI_DEFAULT) {
-        LOG.warn("The given interval for marking stale datanode = "
-            + this.staleInterval + ", which is smaller than the default value "
-            + DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_MILLI_DEFAULT
-            + ".");
-      }
+
+    staleInterval = getStaleIntervalFromConf(conf, heartbeatExpireInterval);
+    avoidStaleDataNodesForWrite = getAvoidStaleForWriteFromConf(conf,
+        checkForStaleDataNodes);
+  }
+  
+  private static long getStaleIntervalFromConf(Configuration conf,
+      long heartbeatExpireInterval) {
+    long staleInterval = conf.getLong(
+        DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY, 
+        DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT);
+    Preconditions.checkArgument(staleInterval > 0,
+        DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY +
+        " = '" + staleInterval + "' is invalid. " +
+        "It should be a positive non-zero value.");
+    
+    final long heartbeatIntervalSeconds = conf.getLong(
+        DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
+        DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT);
+    // The stale interval value cannot be smaller than 
+    // 3 times of heartbeat interval 
+    final long minStaleInterval = conf.getInt(
+        DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_MINIMUM_INTERVAL_KEY,
+        DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_MINIMUM_INTERVAL_DEFAULT)
+        * heartbeatIntervalSeconds * 1000;
+    if (staleInterval < minStaleInterval) {
+      LOG.warn("The given interval for marking stale datanode = "
+          + staleInterval + ", which is less than "
+          + DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_MINIMUM_INTERVAL_DEFAULT
+          + " heartbeat intervals. This may cause too frequent changes of " 
+          + "stale states of DataNodes since a heartbeat msg may be missing " 
+          + "due to temporary short-term failures. Reset stale interval to " 
+          + minStaleInterval + ".");
+      staleInterval = minStaleInterval;
+    }
+    if (staleInterval > heartbeatExpireInterval) {
+      LOG.warn("The given interval for marking stale datanode = "
+          + staleInterval + ", which is larger than heartbeat expire interval "
+          + heartbeatExpireInterval + ".");
     }
+    return staleInterval;
   }
-
-  private Daemon decommissionthread = null;
-
+  
+  static boolean getAvoidStaleForWriteFromConf(Configuration conf,
+      boolean checkForStale) {
+    boolean avoid = conf.getBoolean(
+        DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY,
+        DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_DEFAULT);
+    boolean avoidStaleDataNodesForWrite = checkForStale && avoid;
+    if (!checkForStale && avoid) {
+      LOG.warn("Cannot set "
+          + DFSConfigKeys.DFS_NAMENODE_CHECK_STALE_DATANODE_KEY
+          + " as false while setting "
+          + DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY
+          + " as true.");
+    }
+    return avoidStaleDataNodesForWrite;
+  }
+  
   void activate(final Configuration conf) {
     final DecommissionManager dm = new DecommissionManager(namesystem, blockManager);
     this.decommissionthread = new Daemon(dm.new Monitor(
@@ -253,9 +303,10 @@ public class DatanodeManager {
         client = new NodeBase(rName + NodeBase.PATH_SEPARATOR_STR + targethost);
     }
     
-    Comparator<DatanodeInfo> comparator = checkForStaleNodes ? 
-                    new DFSUtil.DecomStaleComparator(staleInterval) : 
-                    DFSUtil.DECOM_COMPARATOR;
+    Comparator<DatanodeInfo> comparator = checkForStaleDataNodes ? 
+        new DFSUtil.DecomStaleComparator(staleInterval) : 
+        DFSUtil.DECOM_COMPARATOR;
+        
     for (LocatedBlock b : locatedblocks) {
       networktopology.pseudoSortByDistance(client, b.getLocations());
       // Move decommissioned/stale datanodes to the bottom
@@ -723,7 +774,7 @@ public class DatanodeManager {
    * 3. Added to exclude --> start decommission.
    * 4. Removed from exclude --> stop decommission.
    */
-  private void refreshDatanodes() throws IOException {
+  private void refreshDatanodes() {
     for(DatanodeDescriptor node : datanodeMap.values()) {
       // Check if not include.
       if (!inHostsList(node)) {
@@ -782,7 +833,61 @@ public class DatanodeManager {
       namesystem.readUnlock();
     }
   }
+  
+  /* Getter and Setter for stale DataNodes related attributes */
+  
+  /**
+   * @return whether or not to avoid writing to stale datanodes
+   */
+  public boolean isAvoidingStaleDataNodesForWrite() {
+    return avoidStaleDataNodesForWrite;
+  }
 
+  /**
+   * Set the value of {@link DatanodeManager#avoidStaleDataNodesForWrite}. 
+   * The HeartbeatManager disable avoidStaleDataNodesForWrite when more than
+   * half of the DataNodes are marked as stale.
+   * 
+   * @param avoidStaleDataNodesForWrite
+   *          The value to set to
+   *          {@link DatanodeManager#avoidStaleDataNodesForWrite}
+   */
+  void setAvoidStaleDataNodesForWrite(boolean avoidStaleDataNodesForWrite) {
+    this.avoidStaleDataNodesForWrite = avoidStaleDataNodesForWrite;
+  }
+
+  /**
+   * @return Whether or not to check stale DataNodes for R/W
+   */
+  boolean isCheckingForStaleDataNodes() {
+    return checkForStaleDataNodes;
+  }
+  
+  /**
+   * @return The time interval used to mark DataNodes as stale.
+   */
+  long getStaleInterval() {
+    return staleInterval;
+  }
+
+  /**
+   * Set the number of current stale DataNodes. The HeartbeatManager got this
+   * number based on DataNodes' heartbeats.
+   * 
+   * @param numStaleNodes
+   *          The number of stale DataNodes to be set.
+   */
+  void setNumStaleNodes(int numStaleNodes) {
+    this.numStaleNodes = numStaleNodes;
+  }
+  
+  /**
+   * @return Return the current number of stale DataNodes (detected by
+   * HeartbeatManager). 
+   */
+  int getNumStaleNodes() {
+    return this.numStaleNodes;
+  }
 
   /** Fetch live and dead datanodes. */
   public void fetchDatanodes(final List<DatanodeDescriptor> live, 
@@ -961,7 +1066,7 @@ public class DatanodeManager {
     return nodes;
   }
   
-  private void setDatanodeDead(DatanodeDescriptor node) throws IOException {
+  private void setDatanodeDead(DatanodeDescriptor node) {
     node.setLastUpdate(0);
   }
 

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java?rev=1397211&r1=1397210&r2=1397211&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java Thu Oct 11 18:08:27 2012
@@ -30,6 +30,8 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.Time;
 
+import com.google.common.base.Preconditions;
+
 /**
  * Manage the heartbeats received from datanodes.
  * The datanode list and statistics are synchronized
@@ -54,18 +56,48 @@ class HeartbeatManager implements Datano
   private final long heartbeatRecheckInterval;
   /** Heartbeat monitor thread */
   private final Daemon heartbeatThread = new Daemon(new Monitor());
-
+  /**
+   * The initial setting of end user which indicates whether or not to avoid
+   * writing to stale datanodes.
+   */
+  private final boolean initialAvoidWriteStaleNodes;
+  /**
+   * When the ratio of stale datanodes reaches this number, stop avoiding 
+   * writing to stale datanodes, i.e., continue using stale nodes for writing.
+   */
+  private final float ratioUseStaleDataNodesForWrite;
+    
   final Namesystem namesystem;
   final BlockManager blockManager;
 
-  HeartbeatManager(final Namesystem namesystem, final BlockManager blockManager,
-      final Configuration conf) {
-    this.heartbeatRecheckInterval = conf.getInt(
-        DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 
-        DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5 minutes
-
+  HeartbeatManager(final Namesystem namesystem,
+      final BlockManager blockManager, final Configuration conf) {
     this.namesystem = namesystem;
     this.blockManager = blockManager;
+    boolean checkStaleNodes = conf.getBoolean(
+        DFSConfigKeys.DFS_NAMENODE_CHECK_STALE_DATANODE_KEY,
+        DFSConfigKeys.DFS_NAMENODE_CHECK_STALE_DATANODE_DEFAULT);
+    long recheckInterval = conf.getInt(
+        DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 
+        DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5 min
+    long staleInterval = conf.getLong(
+        DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY, 
+        DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT);// 30s
+    this.initialAvoidWriteStaleNodes = DatanodeManager
+        .getAvoidStaleForWriteFromConf(conf, checkStaleNodes);
+    this.ratioUseStaleDataNodesForWrite = conf.getFloat(
+        DFSConfigKeys.DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_KEY,
+        DFSConfigKeys.DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_DEFAULT);
+    Preconditions.checkArgument(
+        (ratioUseStaleDataNodesForWrite > 0 && 
+            ratioUseStaleDataNodesForWrite <= 1.0f),
+        DFSConfigKeys.DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_KEY +
+        " = '" + ratioUseStaleDataNodesForWrite + "' is invalid. " +
+        "It should be a positive non-zero float value, not greater than 1.0f.");
+    
+    this.heartbeatRecheckInterval = (checkStaleNodes 
+        && initialAvoidWriteStaleNodes 
+        && staleInterval < recheckInterval) ? staleInterval : recheckInterval;
   }
 
   void activate(Configuration conf) {
@@ -210,16 +242,39 @@ class HeartbeatManager implements Datano
     if (namesystem.isInSafeMode()) {
       return;
     }
+    boolean checkStaleNodes = dm.isCheckingForStaleDataNodes();
     boolean allAlive = false;
     while (!allAlive) {
       // locate the first dead node.
       DatanodeID dead = null;
+      // check the number of stale nodes
+      int numOfStaleNodes = 0;
       synchronized(this) {
         for (DatanodeDescriptor d : datanodes) {
-          if (dm.isDatanodeDead(d)) {
+          if (dead == null && dm.isDatanodeDead(d)) {
             stats.incrExpiredHeartbeats();
             dead = d;
-            break;
+            if (!checkStaleNodes) {
+              break;
+            }
+          }
+          if (checkStaleNodes && 
+              d.isStale(dm.getStaleInterval())) {
+            numOfStaleNodes++;
+          }
+        }
+        
+        // Change whether to avoid using stale datanodes for writing
+        // based on proportion of stale datanodes
+        if (checkStaleNodes) {
+          dm.setNumStaleNodes(numOfStaleNodes);
+          if (numOfStaleNodes > 
+                datanodes.size() * ratioUseStaleDataNodesForWrite) {
+            dm.setAvoidStaleDataNodesForWrite(false);
+          } else {
+            if (this.initialAvoidWriteStaleNodes) {
+              dm.setAvoidStaleDataNodesForWrite(true);
+            }
           }
         }
       }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1397211&r1=1397210&r2=1397211&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Thu Oct 11 18:08:27 2012
@@ -251,7 +251,7 @@ public class DataNode extends Configured
   Daemon dataXceiverServer = null;
   ThreadGroup threadGroup = null;
   private DNConf dnConf;
-  private boolean heartbeatsDisabledForTests = false;
+  private volatile boolean heartbeatsDisabledForTests = false;
   private DataStorage storage = null;
   private HttpServer infoServer = null;
   DataNodeMetrics metrics;

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java?rev=1397211&r1=1397210&r2=1397211&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java Thu Oct 11 18:08:27 2012
@@ -32,8 +32,16 @@ public interface FSClusterStats {
    * @return a count of the total number of block transfers and block
    *         writes that are currently occuring on the cluster.
    */
-
-  public int getTotalLoad() ;
+  public int getTotalLoad();
+  
+  /**
+   * Indicate whether or not the cluster is now avoiding 
+   * to use stale DataNodes for writing.
+   * 
+   * @return True if the cluster is currently avoiding using stale DataNodes 
+   *         for writing targets, and false otherwise.
+   */
+  public boolean isAvoidingStaleDataNodesForWrite();
 }
     
     

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1397211&r1=1397210&r2=1397211&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Thu Oct 11 18:08:27 2012
@@ -5539,4 +5539,10 @@ public class FSNamesystem implements Nam
   public void setNNResourceChecker(NameNodeResourceChecker nnResourceChecker) {
     this.nnResourceChecker = nnResourceChecker;
   }
+
+  @Override
+  public boolean isAvoidingStaleDataNodesForWrite() {
+    return this.blockManager.getDatanodeManager()
+        .isAvoidingStaleDataNodesForWrite();
+  }
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml?rev=1397211&r1=1397210&r2=1397211&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml Thu Oct 11 18:08:27 2012
@@ -988,12 +988,28 @@
   <name>dfs.namenode.check.stale.datanode</name>
   <value>false</value>
   <description>
-  	Indicate whether or not to check "stale" datanodes whose 
-  	heartbeat messages have not been received by the namenode 
-  	for more than a specified time interval. If this configuration 
-  	parameter is set as true, the stale datanodes will be moved to 
-  	the end of the target node list for reading. The writing will 
-  	also try to avoid stale nodes.
+    Indicate whether or not to check "stale" datanodes whose 
+    heartbeat messages have not been received by the namenode 
+    for more than a specified time interval. If this configuration 
+    parameter is set as true, the system will keep track 
+    of the number of stale datanodes. The stale datanodes will be 
+    moved to the end of the node list returned for reading. See
+    dfs.namenode.avoid.write.stale.datanode for details on how this 
+    affects writes. 
+  </description>
+</property>
+
+<property>
+  <name>dfs.namenode.avoid.write.stale.datanode</name>
+  <value>false</value>
+  <description>
+    Indicate whether or not to avoid writing to "stale" datanodes whose 
+    heartbeat messages have not been received by the namenode 
+    for more than a specified time interval. If this configuration 
+    parameter and dfs.namenode.check.stale.datanode are both set as true, 
+    the writing will avoid using stale datanodes unless a high number 
+    of datanodes are marked as stale. See 
+    dfs.namenode.write.stale.datanode.ratio for details.
   </description>
 </property>
 
@@ -1001,10 +1017,24 @@
   <name>dfs.namenode.stale.datanode.interval</name>
   <value>30000</value>
   <description>
-  	Default time interval for marking a datanode as "stale", i.e., if 
-  	the namenode has not received heartbeat msg from a datanode for 
-  	more than this time interval, the datanode will be marked and treated 
-  	as "stale" by default.
+    Default time interval for marking a datanode as "stale", i.e., if 
+    the namenode has not received heartbeat msg from a datanode for 
+    more than this time interval, the datanode will be marked and treated 
+    as "stale" by default. The stale interval cannot be too small since 
+    otherwise this may cause too frequent change of stale states. 
+    We thus set a minimum stale interval value (the default value is 3 times 
+    of heartbeat interval) and guarantee that the stale interval cannot be less
+    than the minimum value.
+  </description>
+</property>
+
+<property>
+  <name>dfs.namenode.write.stale.datanode.ratio</name>
+  <value>0.5f</value>
+  <description>
+    When the ratio of number stale datanodes to total datanodes marked
+    is greater than this ratio, stop avoiding writing to stale nodes so
+    as to prevent causing hotspots.
   </description>
 </property>
 

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java?rev=1397211&r1=1397210&r2=1397211&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java Thu Oct 11 18:08:27 2012
@@ -38,9 +38,12 @@ import org.apache.hadoop.hdfs.HdfsConfig
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.Node;
+import org.apache.hadoop.util.Time;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
@@ -55,6 +58,9 @@ public class TestReplicationPolicy {
   private static BlockPlacementPolicy replicator;
   private static final String filename = "/dummyfile.txt";
   private static DatanodeDescriptor dataNodes[];
+  // The interval for marking a datanode as stale,
+  private static long staleInterval = 
+      DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT;
 
   @Rule
   public ExpectedException exception = ExpectedException.none();
@@ -77,6 +83,8 @@ public class TestReplicationPolicy {
         "test.build.data", "build/test/data"), "dfs/");
     conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
         new File(baseDir, "name").getPath());
+    // Enable the checking for stale datanodes in the beginning
+    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_CHECK_STALE_DATANODE_KEY, true);
 
     DFSTestUtil.formatNameNode(conf);
     namenode = new NameNode(conf);
@@ -229,7 +237,7 @@ public class TestReplicationPolicy {
     assertEquals(2, targets.length);
     //make sure that the chosen node is in the target.
     int i = 0;
-    for(; i < targets.length && !dataNodes[2].equals(targets[i]); i++);
+    for (; i < targets.length && !dataNodes[2].equals(targets[i]); i++);
     assertTrue(i < targets.length);
   }
 
@@ -369,6 +377,202 @@ public class TestReplicationPolicy {
     assertTrue(cluster.isOnSameRack(targets[1], targets[2]));
     assertFalse(cluster.isOnSameRack(targets[0], targets[1]));    
   }
+
+  private boolean containsWithinRange(DatanodeDescriptor target,
+      DatanodeDescriptor[] nodes, int startIndex, int endIndex) {
+    assert startIndex >= 0 && startIndex < nodes.length;
+    assert endIndex >= startIndex && endIndex < nodes.length;
+    for (int i = startIndex; i <= endIndex; i++) {
+      if (nodes[i].equals(target)) {
+        return true;
+      }
+    }
+    return false;
+  }
+  
+  @Test
+  public void testChooseTargetWithStaleNodes() throws Exception {
+    // Enable avoidng writing to stale datanodes
+    namenode.getNamesystem().getBlockManager().getDatanodeManager()
+        .setAvoidStaleDataNodesForWrite(true);
+    // Set dataNodes[0] as stale
+    dataNodes[0].setLastUpdate(Time.now() - staleInterval - 1);
+
+    DatanodeDescriptor[] targets;
+    // We set the datanode[0] as stale, thus should choose datanode[1] since
+    // datanode[1] is on the same rack with datanode[0] (writer)
+    targets = replicator.chooseTarget(filename, 1, dataNodes[0],
+        new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+    assertEquals(targets.length, 1);
+    assertEquals(targets[0], dataNodes[1]);
+
+    HashMap<Node, Node> excludedNodes = new HashMap<Node, Node>();
+    excludedNodes.put(dataNodes[1], dataNodes[1]);
+    List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
+    BlockPlacementPolicyDefault repl = (BlockPlacementPolicyDefault)replicator;
+    targets = chooseTarget(repl, 1, dataNodes[0], chosenNodes, excludedNodes,
+        BLOCK_SIZE);
+    assertEquals(targets.length, 1);
+    assertFalse(cluster.isOnSameRack(targets[0], dataNodes[0]));
+    
+    // reset
+    namenode.getNamesystem().getBlockManager().getDatanodeManager()
+        .setAvoidStaleDataNodesForWrite(false);
+    dataNodes[0].setLastUpdate(Time.now());
+  }
+
+  /**
+   * In this testcase, we set 3 nodes (dataNodes[0] ~ dataNodes[2]) as stale,
+   * and when the number of replicas is less or equal to 3, all the healthy
+   * datanodes should be returned by the chooseTarget method. When the number 
+   * of replicas is 4, a stale node should be included.
+   * 
+   * @throws Exception
+   */
+  @Test
+  public void testChooseTargetWithHalfStaleNodes() throws Exception {
+    // Enable stale datanodes checking
+    namenode.getNamesystem().getBlockManager().getDatanodeManager()
+        .setAvoidStaleDataNodesForWrite(true);
+    // Set dataNodes[0], dataNodes[1], and dataNodes[2] as stale
+    for (int i = 0; i < 3; i++) {
+      dataNodes[i].setLastUpdate(Time.now() - staleInterval - 1);
+    }
+
+    DatanodeDescriptor[] targets;
+    targets = replicator.chooseTarget(filename, 0, dataNodes[0],
+        new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+    assertEquals(targets.length, 0);
+
+    // We set the datanode[0] as stale, thus should choose datanode[1]
+    targets = replicator.chooseTarget(filename, 1, dataNodes[0],
+        new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+    assertEquals(targets.length, 1);
+    assertFalse(containsWithinRange(targets[0], dataNodes, 0, 2));
+
+    targets = replicator.chooseTarget(filename, 2, dataNodes[0],
+        new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+    assertEquals(targets.length, 2);
+    assertFalse(containsWithinRange(targets[0], dataNodes, 0, 2));
+    assertFalse(containsWithinRange(targets[1], dataNodes, 0, 2));
+
+    targets = replicator.chooseTarget(filename, 3, dataNodes[0],
+        new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+    assertEquals(targets.length, 3);
+    assertTrue(containsWithinRange(targets[0], dataNodes, 3, 5));
+    assertTrue(containsWithinRange(targets[1], dataNodes, 3, 5));
+    assertTrue(containsWithinRange(targets[2], dataNodes, 3, 5));
+
+    targets = replicator.chooseTarget(filename, 4, dataNodes[0],
+        new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+    assertEquals(targets.length, 4);
+    assertTrue(containsWithinRange(dataNodes[3], targets, 0, 3));
+    assertTrue(containsWithinRange(dataNodes[4], targets, 0, 3));
+    assertTrue(containsWithinRange(dataNodes[5], targets, 0, 3));
+
+    // reset
+    namenode.getNamesystem().getBlockManager().getDatanodeManager()
+        .setAvoidStaleDataNodesForWrite(false);
+    for (int i = 0; i < dataNodes.length; i++) {
+      dataNodes[i].setLastUpdate(Time.now());
+    }
+  }
+
+  @Test
+  public void testChooseTargetWithMoreThanHalfStaleNodes() throws Exception {
+    HdfsConfiguration conf = new HdfsConfiguration();
+    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_CHECK_STALE_DATANODE_KEY, true);
+    conf.setBoolean(
+        DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY, true);
+    String[] hosts = new String[]{"host1", "host2", "host3", 
+                                  "host4", "host5", "host6"};
+    String[] racks = new String[]{"/d1/r1", "/d1/r1", "/d1/r2", 
+                                  "/d1/r2", "/d2/r3", "/d2/r3"};
+    MiniDFSCluster miniCluster = new MiniDFSCluster.Builder(conf).racks(racks)
+        .hosts(hosts).numDataNodes(hosts.length).build();
+    miniCluster.waitActive();
+    
+    try {
+      // Step 1. Make two datanodes as stale, check whether the 
+      // avoidStaleDataNodesForWrite calculation is correct.
+      // First stop the heartbeat of host1 and host2
+      for (int i = 0; i < 2; i++) {
+        DataNode dn = miniCluster.getDataNodes().get(i);
+        DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true);
+        miniCluster.getNameNode().getNamesystem().getBlockManager()
+            .getDatanodeManager().getDatanode(dn.getDatanodeId())
+            .setLastUpdate(Time.now() - staleInterval - 1);
+      }
+      // Instead of waiting, explicitly call heartbeatCheck to 
+      // let heartbeat manager to detect stale nodes
+      miniCluster.getNameNode().getNamesystem().getBlockManager()
+          .getDatanodeManager().getHeartbeatManager().heartbeatCheck();
+      int numStaleNodes = miniCluster.getNameNode().getNamesystem()
+          .getBlockManager().getDatanodeManager().getNumStaleNodes();
+      assertEquals(numStaleNodes, 2);
+      assertTrue(miniCluster.getNameNode().getNamesystem().getBlockManager()
+          .getDatanodeManager().isAvoidingStaleDataNodesForWrite());
+      // Call chooseTarget
+      DatanodeDescriptor staleNodeInfo = miniCluster.getNameNode()
+          .getNamesystem().getBlockManager().getDatanodeManager()
+          .getDatanode(miniCluster.getDataNodes().get(0).getDatanodeId());
+      BlockPlacementPolicy replicator = miniCluster.getNameNode()
+          .getNamesystem().getBlockManager().getBlockPlacementPolicy();
+      DatanodeDescriptor[] targets = replicator.chooseTarget(filename, 3,
+          staleNodeInfo, new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+      assertEquals(targets.length, 3);
+      assertFalse(cluster.isOnSameRack(targets[0], staleNodeInfo));
+      
+      // Step 2. Set more than half of the datanodes as stale
+      for (int i = 0; i < 4; i++) {
+        DataNode dn = miniCluster.getDataNodes().get(i);
+        DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true);
+        miniCluster.getNameNode().getNamesystem().getBlockManager()
+            .getDatanodeManager().getDatanode(dn.getDatanodeId())
+            .setLastUpdate(Time.now() - staleInterval - 1);
+      }
+      // Explicitly call heartbeatCheck
+      miniCluster.getNameNode().getNamesystem().getBlockManager()
+          .getDatanodeManager().getHeartbeatManager().heartbeatCheck();
+      numStaleNodes = miniCluster.getNameNode().getNamesystem()
+          .getBlockManager().getDatanodeManager().getNumStaleNodes();
+      assertEquals(numStaleNodes, 4);
+      // According to our strategy, stale datanodes will be included for writing
+      // to avoid hotspots
+      assertFalse(miniCluster.getNameNode().getNamesystem().getBlockManager()
+          .getDatanodeManager().isAvoidingStaleDataNodesForWrite());     
+      // Call chooseTarget
+      targets = replicator.chooseTarget(filename, 3,
+          staleNodeInfo, new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+      assertEquals(targets.length, 3);
+      assertTrue(cluster.isOnSameRack(targets[0], staleNodeInfo));
+      
+      // Step 3. Set 2 stale datanodes back to healthy nodes, 
+      // still have 2 stale nodes
+      for (int i = 2; i < 4; i++) {
+        DataNode dn = miniCluster.getDataNodes().get(i);
+        DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, false);
+        miniCluster.getNameNode().getNamesystem().getBlockManager()
+            .getDatanodeManager().getDatanode(dn.getDatanodeId())
+            .setLastUpdate(Time.now());
+      }
+      // Explicitly call heartbeatCheck
+      miniCluster.getNameNode().getNamesystem().getBlockManager()
+          .getDatanodeManager().getHeartbeatManager().heartbeatCheck();
+      numStaleNodes = miniCluster.getNameNode().getNamesystem()
+          .getBlockManager().getDatanodeManager().getNumStaleNodes();
+      assertEquals(numStaleNodes, 2);
+      assertTrue(miniCluster.getNameNode().getNamesystem().getBlockManager()
+          .getDatanodeManager().isAvoidingStaleDataNodesForWrite());
+      // Call chooseTarget
+      targets = replicator.chooseTarget(filename, 3,
+          staleNodeInfo, new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+      assertEquals(targets.length, 3);
+      assertFalse(cluster.isOnSameRack(targets[0], staleNodeInfo));
+    } finally {
+      miniCluster.shutdown();
+    }
+  }
   
   /**
    * This testcase tests re-replication, when dataNodes[0] is already chosen.
@@ -490,8 +694,8 @@ public class TestReplicationPolicy {
         .format(true).build();
     try {
       cluster.waitActive();
-      final UnderReplicatedBlocks neededReplications = (UnderReplicatedBlocks) cluster
-          .getNameNode().getNamesystem().getBlockManager().neededReplications;
+      final UnderReplicatedBlocks neededReplications = cluster.getNameNode()
+          .getNamesystem().getBlockManager().neededReplications;
       for (int i = 0; i < 100; i++) {
         // Adding the blocks directly to normal priority
         neededReplications.add(new Block(random.nextLong()), 2, 0, 3);
@@ -529,10 +733,10 @@ public class TestReplicationPolicy {
       // Adding QUEUE_VERY_UNDER_REPLICATED block
       underReplicatedBlocks.add(new Block(random.nextLong()), 2, 0, 7);
 
-      // Adding QUEUE_UNDER_REPLICATED block
+      // Adding QUEUE_REPLICAS_BADLY_DISTRIBUTED block
       underReplicatedBlocks.add(new Block(random.nextLong()), 6, 0, 6);
 
-      // Adding QUEUE_REPLICAS_BADLY_DISTRIBUTED block
+      // Adding QUEUE_UNDER_REPLICATED block
       underReplicatedBlocks.add(new Block(random.nextLong()), 5, 0, 6);
 
       // Adding QUEUE_WITH_CORRUPT_BLOCKS block
@@ -618,6 +822,11 @@ public class TestReplicationPolicy {
     dataNodes[5].setRemaining(1*1024*1024);
     replicaNodeList.add(dataNodes[5]);
     
+    // Refresh the last update time for all the datanodes
+    for (int i = 0; i < dataNodes.length; i++) {
+      dataNodes[i].setLastUpdate(Time.now());
+    }
+    
     List<DatanodeDescriptor> first = new ArrayList<DatanodeDescriptor>();
     List<DatanodeDescriptor> second = new ArrayList<DatanodeDescriptor>();
     replicator.splitNodesWithRack(