You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2012/10/17 04:08:24 UTC

svn commit: r1399076 - in /hadoop/common/branches/branch-1: ./ src/hdfs/ src/hdfs/org/apache/hadoop/hdfs/ src/hdfs/org/apache/hadoop/hdfs/server/namenode/ src/test/org/apache/hadoop/hdfs/ src/test/org/apache/hadoop/hdfs/server/namenode/

Author: suresh
Date: Wed Oct 17 02:08:23 2012
New Revision: 1399076

URL: http://svn.apache.org/viewvc?rev=1399076&view=rev
Log:
HDFS-3912. Detect and avoid stale datanodes for writes. Contributed by Jing Zhao.

Modified:
    hadoop/common/branches/branch-1/CHANGES.txt
    hadoop/common/branches/branch-1/src/hdfs/hdfs-default.xml
    hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/DFSConfigKeys.java
    hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java
    hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java
    hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestGetBlocks.java
    hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/server/namenode/TestReplicationPolicy.java

Modified: hadoop/common/branches/branch-1/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/CHANGES.txt?rev=1399076&r1=1399075&r2=1399076&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1/CHANGES.txt Wed Oct 17 02:08:23 2012
@@ -26,6 +26,9 @@ Release 1.2.0 - unreleased
     MAPREDUCE-461. Enable service-plugins for JobTracker. (Fredrik Hedberg and
     Brandon Li via vinodkv)
 
+    HDFS-3912. Detect and avoid stale datanodes for writes.
+    (Jing Zhao via suresh)
+
   IMPROVEMENTS
 
     HDFS-3515. Port HDFS-1457 to branch-1. (eli)

Modified: hadoop/common/branches/branch-1/src/hdfs/hdfs-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/hdfs/hdfs-default.xml?rev=1399076&r1=1399075&r2=1399076&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/hdfs/hdfs-default.xml (original)
+++ hadoop/common/branches/branch-1/src/hdfs/hdfs-default.xml Wed Oct 17 02:08:23 2012
@@ -616,12 +616,28 @@ creations/deletions), or "all".</descrip
   <name>dfs.namenode.check.stale.datanode</name>
   <value>false</value>
   <description>
-  	Indicate whether or not to check "stale" datanodes whose 
-  	heartbeat messages have not been received by the namenode 
-  	for more than a specified time interval. If this configuration 
-  	parameter is set as true, the stale datanodes will be moved to 
-  	the end of the target node list for reading. The writing will 
-  	also try to avoid stale nodes.
+    Indicate whether or not to check "stale" datanodes whose 
+    heartbeat messages have not been received by the namenode 
+    for more than a specified time interval. If this configuration 
+    parameter is set as true, the system will keep track 
+    of the number of stale datanodes. The stale datanodes will be 
+    moved to the end of the node list returned for reading. See
+    dfs.namenode.avoid.write.stale.datanode for details on how this 
+    affects writes.
+  </description>
+</property>
+
+<property>
+  <name>dfs.namenode.avoid.write.stale.datanode</name>
+  <value>false</value>
+  <description>
+    Indicate whether or not to avoid writing to "stale" datanodes whose 
+    heartbeat messages have not been received by the namenode 
+    for more than a specified time interval. If this configuration 
+    parameter and dfs.namenode.check.stale.datanode are both set as true, 
+    the writing will avoid using stale datanodes unless a high number 
+    of datanodes are marked as stale. See 
+    dfs.namenode.write.stale.datanode.ratio for details.
   </description>
 </property>
 
@@ -629,10 +645,24 @@ creations/deletions), or "all".</descrip
   <name>dfs.namenode.stale.datanode.interval</name>
   <value>30000</value>
   <description>
-  	Default time interval for marking a datanode as "stale", i.e., if 
-  	the namenode has not received heartbeat msg from a datanode for 
-  	more than this time interval, the datanode will be marked and treated 
-  	as "stale" by default.
+    Default time interval for marking a datanode as "stale", i.e., if 
+    the namenode has not received heartbeat msg from a datanode for 
+    more than this time interval, the datanode will be marked and treated 
+    as "stale" by default. The stale interval cannot be too small since 
+    otherwise this may cause too frequent change of stale states. 
+    We thus set a minimum stale interval value (the default value is 3 times 
+    of heartbeat interval) and guarantee that the stale interval cannot be less
+    than the minimum value.
+  </description>
+</property>
+
+<property>
+  <name>dfs.namenode.write.stale.datanode.ratio</name>
+  <value>0.5f</value>
+  <description>
+    When the ratio of number stale datanodes to total datanodes marked
+    is greater than this ratio, stop avoiding writing to stale nodes so
+   as to prevent causing hotspots.
   </description>
 </property>
 

Modified: hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1399076&r1=1399075&r2=1399076&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/DFSConfigKeys.java Wed Oct 17 02:08:23 2012
@@ -150,9 +150,20 @@ public class DFSConfigKeys extends Commo
   // Whether to enable datanode's stale state detection and usage
   public static final String DFS_NAMENODE_CHECK_STALE_DATANODE_KEY = "dfs.namenode.check.stale.datanode";
   public static final boolean DFS_NAMENODE_CHECK_STALE_DATANODE_DEFAULT = false;
+  // Whether to enable datanode's stale state detection and usage
+  public static final String DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY = "dfs.namenode.avoid.write.stale.datanode";
+  public static final boolean DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_DEFAULT = false;
   // The default value of the time interval for marking datanodes as stale
   public static final String DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY = "dfs.namenode.stale.datanode.interval";
-  public static final long DFS_NAMENODE_STALE_DATANODE_INTERVAL_MILLI_DEFAULT = 30 * 1000; // 30s
+  public static final long DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT = 30 * 1000; // 30s
+  // The stale interval cannot be too small since otherwise this may cause too frequent churn on stale states. 
+  // This value uses the times of heartbeat interval to define the minimum value for stale interval.  
+  public static final String DFS_NAMENODE_STALE_DATANODE_MINIMUM_INTERVAL_KEY = "dfs.namenode.stale.datanode.minimum.interval";
+  public static final int DFS_NAMENODE_STALE_DATANODE_MINIMUM_INTERVAL_DEFAULT = 3; // i.e. min_interval is 3 * heartbeat_interval = 9s
+  // When the number stale datanodes marked as stale reached this certian ratio, 
+  // stop avoiding writing to stale nodes so as to prevent causing hotspots.
+  public static final String DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_KEY = "dfs.namenode.write.stale.datanode.ratio";
+  public static final float DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_DEFAULT = 0.5f;
 
   //Code in hdfs is not updated to use these keys.
   public static final String  DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY = "dfs.client.block.write.locateFollowingBlock.retries";

Modified: hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java?rev=1399076&r1=1399075&r2=1399076&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java (original)
+++ hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java Wed Oct 17 02:08:23 2012
@@ -17,9 +17,18 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
-import org.apache.commons.logging.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
@@ -27,7 +36,6 @@ import org.apache.hadoop.hdfs.protocol.L
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.net.NodeBase;
-import java.util.*;
 
 /** The class is responsible for choosing the desired number of targets
  * for placing block replicas.
@@ -42,6 +50,7 @@ public class BlockPlacementPolicyDefault
   private boolean considerLoad; 
   private NetworkTopology clusterMap;
   private FSClusterStats stats;
+  private long staleInterval;   // interval used to identify stale DataNodes
 
   BlockPlacementPolicyDefault(Configuration conf,  FSClusterStats stats,
                            NetworkTopology clusterMap) {
@@ -54,9 +63,13 @@ public class BlockPlacementPolicyDefault
   /** {@inheritDoc} */
   public void initialize(Configuration conf,  FSClusterStats stats,
                          NetworkTopology clusterMap) {
-    this.considerLoad = conf.getBoolean("dfs.replication.considerLoad", true);
+    this.considerLoad = conf.getBoolean(
+        DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, true);
     this.stats = stats;
     this.clusterMap = clusterMap;
+    this.staleInterval = conf.getLong(
+        DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY,
+        DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT);
   }
 
   /** {@inheritDoc} */
@@ -125,8 +138,10 @@ public class BlockPlacementPolicyDefault
       writer=null;
     }
       
-    DatanodeDescriptor localNode = chooseTarget(numOfReplicas, writer, 
-                                                excludedNodes, blocksize, maxNodesPerRack, results);
+    boolean avoidStaleNodes = (stats != null && stats
+        .isAvoidingStaleDataNodesForWrite());
+    DatanodeDescriptor localNode = chooseTarget(numOfReplicas, writer,
+        excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes);
       
     results.removeAll(chosenNodes);
       
@@ -137,11 +152,9 @@ public class BlockPlacementPolicyDefault
     
   /* choose <i>numOfReplicas</i> from all data nodes */
   private DatanodeDescriptor chooseTarget(int numOfReplicas,
-                                          DatanodeDescriptor writer,
-                                          HashMap<Node, Node> excludedNodes,
-                                          long blocksize,
-                                          int maxNodesPerRack,
-                                          List<DatanodeDescriptor> results) {
+      DatanodeDescriptor writer, HashMap<Node, Node> excludedNodes,
+      long blocksize, int maxNodesPerRack, List<DatanodeDescriptor> results,
+      boolean avoidStaleNodes) {
       
     if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) {
       return writer;
@@ -152,42 +165,57 @@ public class BlockPlacementPolicyDefault
     if (writer == null && !newBlock) {
       writer = results.get(0);
     }
-      
+        
+    // Keep a copy of original excludedNodes
+    final HashMap<Node, Node> oldExcludedNodes = avoidStaleNodes ? 
+        new HashMap<Node, Node>(excludedNodes) : null;
+    
     try {
       if (numOfResults == 0) {
-        writer = chooseLocalNode(writer, excludedNodes, 
-                                 blocksize, maxNodesPerRack, results);
+        writer = chooseLocalNode(writer, excludedNodes, blocksize,
+            maxNodesPerRack, results, avoidStaleNodes);
         if (--numOfReplicas == 0) {
           return writer;
         }
       }
       if (numOfResults <= 1) {
-        chooseRemoteRack(1, results.get(0), excludedNodes, 
-                         blocksize, maxNodesPerRack, results);
+        chooseRemoteRack(1, results.get(0), excludedNodes, blocksize,
+            maxNodesPerRack, results, avoidStaleNodes);
         if (--numOfReplicas == 0) {
           return writer;
         }
       }
       if (numOfResults <= 2) {
         if (clusterMap.isOnSameRack(results.get(0), results.get(1))) {
-          chooseRemoteRack(1, results.get(0), excludedNodes,
-                           blocksize, maxNodesPerRack, results);
+          chooseRemoteRack(1, results.get(0), excludedNodes, blocksize,
+              maxNodesPerRack, results, avoidStaleNodes);
         } else if (newBlock){
-          chooseLocalRack(results.get(1), excludedNodes, blocksize, 
-                          maxNodesPerRack, results);
+          chooseLocalRack(results.get(1), excludedNodes, blocksize,
+              maxNodesPerRack, results, avoidStaleNodes);
         } else {
-          chooseLocalRack(writer, excludedNodes, blocksize,
-                          maxNodesPerRack, results);
+          chooseLocalRack(writer, excludedNodes, blocksize, maxNodesPerRack,
+              results, avoidStaleNodes);
         }
         if (--numOfReplicas == 0) {
           return writer;
         }
       }
-      chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, 
-                   blocksize, maxNodesPerRack, results);
+      chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, blocksize,
+          maxNodesPerRack, results, avoidStaleNodes);
     } catch (NotEnoughReplicasException e) {
       FSNamesystem.LOG.warn("Not able to place enough replicas, still in need of "
                + numOfReplicas);
+      if (avoidStaleNodes) {
+        // excludedNodes now has - initial excludedNodes, any nodes that were
+        // chosen and nodes that were tried but were not chosen because they
+        // were stale, decommissioned or for any other reason a node is not
+        // chosen for write. Retry again now not avoiding stale node
+        for (Node node : results) {
+          oldExcludedNodes.put(node, node);
+        }
+        return chooseTarget(numOfReplicas, writer, oldExcludedNodes, blocksize,
+            maxNodesPerRack, results, false);
+      }
     }
     return writer;
   }
@@ -197,31 +225,28 @@ public class BlockPlacementPolicyDefault
    * choose a node on the same rack
    * @return the chosen node
    */
-  private DatanodeDescriptor chooseLocalNode(
-                                             DatanodeDescriptor localMachine,
-                                             HashMap<Node, Node> excludedNodes,
-                                             long blocksize,
-                                             int maxNodesPerRack,
-                                             List<DatanodeDescriptor> results)
+  private DatanodeDescriptor chooseLocalNode(DatanodeDescriptor localMachine,
+      HashMap<Node, Node> excludedNodes, long blocksize, int maxNodesPerRack,
+      List<DatanodeDescriptor> results, boolean avoidStaleNodes)
     throws NotEnoughReplicasException {
     // if no local machine, randomly choose one node
     if (localMachine == null)
-      return chooseRandom(NodeBase.ROOT, excludedNodes, 
-                          blocksize, maxNodesPerRack, results);
+      return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
+          maxNodesPerRack, results, avoidStaleNodes);
       
     // otherwise try local machine first
     Node oldNode = excludedNodes.put(localMachine, localMachine);
     if (oldNode == null) { // was not in the excluded list
-      if (isGoodTarget(localMachine, blocksize,
-                       maxNodesPerRack, false, results)) {
+      if (isGoodTarget(localMachine, blocksize, maxNodesPerRack, false,
+          results, avoidStaleNodes)) {
         results.add(localMachine);
         return localMachine;
       }
     } 
       
     // try a node on local rack
-    return chooseLocalRack(localMachine, excludedNodes, 
-                           blocksize, maxNodesPerRack, results);
+    return chooseLocalRack(localMachine, excludedNodes, blocksize,
+        maxNodesPerRack, results, avoidStaleNodes);
   }
     
   /* choose one node from the rack that <i>localMachine</i> is on.
@@ -231,24 +256,20 @@ public class BlockPlacementPolicyDefault
    * in the cluster.
    * @return the chosen node
    */
-  private DatanodeDescriptor chooseLocalRack(
-                                             DatanodeDescriptor localMachine,
-                                             HashMap<Node, Node> excludedNodes,
-                                             long blocksize,
-                                             int maxNodesPerRack,
-                                             List<DatanodeDescriptor> results)
+  private DatanodeDescriptor chooseLocalRack(DatanodeDescriptor localMachine,
+      HashMap<Node, Node> excludedNodes, long blocksize, int maxNodesPerRack,
+      List<DatanodeDescriptor> results, boolean avoidStaleNodes)
     throws NotEnoughReplicasException {
     // no local machine, so choose a random machine
     if (localMachine == null) {
-      return chooseRandom(NodeBase.ROOT, excludedNodes, 
-                          blocksize, maxNodesPerRack, results);
+      return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
+          maxNodesPerRack, results, avoidStaleNodes);
     }
       
     // choose one from the local rack
     try {
-      return chooseRandom(
-                          localMachine.getNetworkLocation(),
-                          excludedNodes, blocksize, maxNodesPerRack, results);
+      return chooseRandom(localMachine.getNetworkLocation(), excludedNodes,
+          blocksize, maxNodesPerRack, results, avoidStaleNodes);
     } catch (NotEnoughReplicasException e1) {
       // find the second replica
       DatanodeDescriptor newLocal=null;
@@ -262,18 +283,17 @@ public class BlockPlacementPolicyDefault
       }
       if (newLocal != null) {
         try {
-          return chooseRandom(
-                              newLocal.getNetworkLocation(),
-                              excludedNodes, blocksize, maxNodesPerRack, results);
+          return chooseRandom(newLocal.getNetworkLocation(), excludedNodes,
+              blocksize, maxNodesPerRack, results, avoidStaleNodes);
         } catch(NotEnoughReplicasException e2) {
-          //otherwise randomly choose one from the network
-          return chooseRandom(NodeBase.ROOT, excludedNodes,
-                              blocksize, maxNodesPerRack, results);
+          // otherwise randomly choose one from the network
+          return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
+              maxNodesPerRack, results, avoidStaleNodes);
         }
       } else {
         //otherwise randomly choose one from the network
-        return chooseRandom(NodeBase.ROOT, excludedNodes,
-                            blocksize, maxNodesPerRack, results);
+        return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
+            maxNodesPerRack, results, avoidStaleNodes);
       }
     }
   }
@@ -289,29 +309,31 @@ public class BlockPlacementPolicyDefault
                                 HashMap<Node, Node> excludedNodes,
                                 long blocksize,
                                 int maxReplicasPerRack,
-                                List<DatanodeDescriptor> results)
+                                List<DatanodeDescriptor> results,
+                                boolean avoidStaleNodes)
     throws NotEnoughReplicasException {
     int oldNumOfReplicas = results.size();
     // randomly choose one node from remote racks
     try {
-      chooseRandom(numOfReplicas, "~"+localMachine.getNetworkLocation(),
-                   excludedNodes, blocksize, maxReplicasPerRack, results);
+      chooseRandom(numOfReplicas, "~" + localMachine.getNetworkLocation(),
+          excludedNodes, blocksize, maxReplicasPerRack, results,
+          avoidStaleNodes);
     } catch (NotEnoughReplicasException e) {
       chooseRandom(numOfReplicas-(results.size()-oldNumOfReplicas),
                    localMachine.getNetworkLocation(), excludedNodes, blocksize, 
-                   maxReplicasPerRack, results);
+                   maxReplicasPerRack, results, avoidStaleNodes);
     }
   }
 
   /* Randomly choose one target from <i>nodes</i>.
    * @return the chosen node
    */
-  private DatanodeDescriptor chooseRandom(
-                                          String nodes,
+  private DatanodeDescriptor chooseRandom(String nodes,
                                           HashMap<Node, Node> excludedNodes,
                                           long blocksize,
                                           int maxNodesPerRack,
-                                          List<DatanodeDescriptor> results) 
+                                          List<DatanodeDescriptor> results,
+                                          boolean avoidStaleNodes) 
     throws NotEnoughReplicasException {
     int numOfAvailableNodes =
       clusterMap.countNumOfAvailableNodes(nodes, excludedNodes.keySet());
@@ -322,7 +344,8 @@ public class BlockPlacementPolicyDefault
       Node oldNode = excludedNodes.put(chosenNode, chosenNode);
       if (oldNode == null) { // choosendNode was not in the excluded list
         numOfAvailableNodes--;
-        if (isGoodTarget(chosenNode, blocksize, maxNodesPerRack, results)) {
+        if (isGoodTarget(chosenNode, blocksize, maxNodesPerRack, results,
+            avoidStaleNodes)) {
           results.add(chosenNode);
           return chosenNode;
         }
@@ -340,7 +363,8 @@ public class BlockPlacementPolicyDefault
                             HashMap<Node, Node> excludedNodes,
                             long blocksize,
                             int maxNodesPerRack,
-                            List<DatanodeDescriptor> results)
+                            List<DatanodeDescriptor> results,
+                            boolean avoidStaleNodes)
     throws NotEnoughReplicasException {
       
     int numOfAvailableNodes =
@@ -352,7 +376,8 @@ public class BlockPlacementPolicyDefault
       if (oldNode == null) {
         numOfAvailableNodes--;
 
-        if (isGoodTarget(chosenNode, blocksize, maxNodesPerRack, results)) {
+        if (isGoodTarget(chosenNode, blocksize, maxNodesPerRack, results,
+            avoidStaleNodes)) {
           numOfReplicas--;
           results.add(chosenNode);
         }
@@ -369,17 +394,34 @@ public class BlockPlacementPolicyDefault
    * return true if <i>node</i> has enough space, 
    * does not have too much load, and the rack does not have too many nodes
    */
-  private boolean isGoodTarget(DatanodeDescriptor node,
-                               long blockSize, int maxTargetPerLoc,
-                               List<DatanodeDescriptor> results) {
-    return isGoodTarget(node, blockSize, maxTargetPerLoc,
-                        this.considerLoad, results);
+  private boolean isGoodTarget(DatanodeDescriptor node, long blockSize,
+      int maxTargetPerLoc, List<DatanodeDescriptor> results,
+      boolean avoidStaleNodes) {
+    return isGoodTarget(node, blockSize, maxTargetPerLoc, this.considerLoad,
+        results, avoidStaleNodes);
   }
-    
+   
+  /**
+   * Determine if a node is a good target. 
+   * 
+   * @param node The target node
+   * @param blockSize Size of block
+   * @param maxTargetPerLoc Maximum number of targets per rack. The value of 
+   *                        this parameter depends on the number of racks in 
+   *                        the cluster and total number of replicas for a block
+   * @param considerLoad whether or not to consider load of the target node
+   * @param results A list containing currently chosen nodes. Used to check if 
+   *                too many nodes has been chosen in the target rack. 
+   * @param avoidStaleNodes Whether or not to avoid choosing stale nodes.               
+   * @return Return true if <i>node</i> has enough space, 
+   *         does not have too much load, 
+   *         and the rack does not have too many nodes.
+   */
   private boolean isGoodTarget(DatanodeDescriptor node,
                                long blockSize, int maxTargetPerLoc,
                                boolean considerLoad,
-                               List<DatanodeDescriptor> results) {
+                               List<DatanodeDescriptor> results,
+                               boolean avoidStaleNodes) {
     Log logr = FSNamesystem.LOG;
     // check if the node is (being) decommissed
     if (node.isDecommissionInProgress() || node.isDecommissioned()) {
@@ -388,6 +430,14 @@ public class BlockPlacementPolicyDefault
       return false;
     }
 
+    if (avoidStaleNodes) {
+      if (node.isStale(this.staleInterval)) {
+        logr.debug("Node "+NodeBase.getPath(node)+
+            " is not chosen because the node is (being) stale");
+        return false;
+      }
+    }
+    
     long remaining = node.getRemaining() - 
                      (node.getBlocksScheduled() * blockSize); 
     // check the remaining capacity of the target machine

Modified: hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java?rev=1399076&r1=1399075&r2=1399076&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java (original)
+++ hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java Wed Oct 17 02:08:23 2012
@@ -29,7 +29,15 @@ public interface FSClusterStats {
    * @return a count of the total number of block transfers and block
    *         writes that are currently occuring on the cluster.
    */
+  public int getTotalLoad();
 
-  public int getTotalLoad() ;
+  /**
+   * Indicate whether or not the cluster is now avoiding to use stale DataNodes
+   * for writing.
+   * 
+   * @return True if the cluster is currently avoiding using stale DataNodes for
+   *         writing targets, and false otherwise.
+   */
+  public boolean isAvoidingStaleDataNodesForWrite();
 }
     
\ No newline at end of file

Modified: hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1399076&r1=1399075&r2=1399076&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Wed Oct 17 02:08:23 2012
@@ -365,10 +365,20 @@ public class FSNamesystem implements FSC
   private long accessTimePrecision = 0;
   private String nameNodeHostName;
   
-  /** Whether or not to check the stale datanodes */
-  private volatile boolean checkForStaleNodes;
-  /** The time interval for detecting stale datanodes */
-  private volatile long staleInterval;
+  /** Whether or not to check stale DataNodes for read/write */
+  private boolean checkForStaleDataNodes;
+  /** The interval for judging stale DataNodes for read/write */
+  private long staleInterval;
+  /** Whether or not to avoid using stale DataNodes for writing */
+  private volatile boolean avoidStaleDataNodesForWrite;
+  private boolean initialAvoidWriteStaleNodes;
+  /** The number of stale DataNodes */
+  private volatile int numStaleNodes;
+  /**
+   * When the ratio of stale datanodes reaches this number, stop avoiding
+   * writing to stale datanodes, i.e., continue using stale nodes for writing.
+   */
+  private float ratioUseStaleDataNodesForWrite;
   
   /**
    * FSNamesystem constructor.
@@ -577,20 +587,85 @@ public class FSNamesystem implements FSC
         + " min(s)");
     
     // set the value of stale interval based on configuration
-    this.checkForStaleNodes = conf.getBoolean(
+    checkForStaleDataNodes = conf.getBoolean(
         DFSConfigKeys.DFS_NAMENODE_CHECK_STALE_DATANODE_KEY,
         DFSConfigKeys.DFS_NAMENODE_CHECK_STALE_DATANODE_DEFAULT);
-    if (this.checkForStaleNodes) {
-      this.staleInterval = conf.getLong(
-          DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY,
-          DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_MILLI_DEFAULT);
-      if (this.staleInterval < DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_MILLI_DEFAULT) {
-        LOG.warn("The given interval for marking stale datanode = "
-            + this.staleInterval + ", which is smaller than the default value "
-            + DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_MILLI_DEFAULT
-            + ".");
-      }
+    staleInterval = getStaleIntervalFromConf(conf, heartbeatExpireInterval);
+    avoidStaleDataNodesForWrite = getAvoidStaleForWriteFromConf(conf,
+        checkForStaleDataNodes);
+    initialAvoidWriteStaleNodes = avoidStaleDataNodesForWrite;
+    ratioUseStaleDataNodesForWrite = 
+        getRatioUseStaleNodesForWriteFromConf(conf);
+  }
+  
+  private static float getRatioUseStaleNodesForWriteFromConf(Configuration conf) {
+    float ratioUseStaleDataNodesForWrite = conf.getFloat(
+        DFSConfigKeys.DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_KEY,
+        DFSConfigKeys.DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_DEFAULT);
+    if (ratioUseStaleDataNodesForWrite > 0
+        && ratioUseStaleDataNodesForWrite <= 1.0f) {
+      return ratioUseStaleDataNodesForWrite;
+    } else {
+      throw new IllegalArgumentException(
+          DFSConfigKeys.DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_KEY
+              + " = '" + ratioUseStaleDataNodesForWrite
+              + "' is invalid. It should be a positive non-zero float value,"
+              + " not greater than 1.0f.");
+    }
+  }
+  
+  private static long getStaleIntervalFromConf(Configuration conf,
+      long heartbeatExpireInterval) {
+    long staleInterval = conf.getLong(
+        DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY,
+        DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT);
+    if (staleInterval <= 0) {
+      throw new IllegalArgumentException(
+          DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY + " = '"
+              + staleInterval
+              + "' is invalid. It should be a positive non-zero value.");
+    }
+    final long heartbeatIntervalSeconds = conf.getLong(
+        DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
+        DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT);
+    // The stale interval value cannot be smaller than
+    // 3 times of heartbeat interval
+    final long minStaleInterval = conf.getInt(
+        DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_MINIMUM_INTERVAL_KEY,
+        DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_MINIMUM_INTERVAL_DEFAULT)
+        * heartbeatIntervalSeconds * 1000;
+    if (staleInterval < minStaleInterval) {
+      LOG.warn("The given interval for marking stale datanode = "
+          + staleInterval + ", which is less than "
+          + DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_MINIMUM_INTERVAL_DEFAULT
+          + " heartbeat intervals. This may cause too frequent changes of "
+          + "stale states of DataNodes since a heartbeat msg may be missing "
+          + "due to temporary short-term failures. Reset stale interval to "
+          + minStaleInterval + ".");
+      staleInterval = minStaleInterval;
+    }
+    if (staleInterval > heartbeatExpireInterval) {
+      LOG.warn("The given interval for marking stale datanode = "
+          + staleInterval + ", which is larger than heartbeat expire interval "
+          + heartbeatExpireInterval + ".");
+    }
+    return staleInterval;
+  }
+
+  static boolean getAvoidStaleForWriteFromConf(Configuration conf,
+      boolean checkForStale) {
+    boolean avoid = conf.getBoolean(
+        DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY,
+        DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_DEFAULT);
+    boolean avoidStaleDataNodesForWrite = checkForStale && avoid;
+    if (!checkForStale && avoid) {
+      LOG.warn("Cannot set "
+          + DFSConfigKeys.DFS_NAMENODE_CHECK_STALE_DATANODE_KEY
+          + " as false while setting "
+          + DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY
+          + " as true.");
     }
+    return avoidStaleDataNodesForWrite;
   }
 
   /**
@@ -955,13 +1030,13 @@ public class FSNamesystem implements FSC
       DatanodeDescriptor client = host2DataNodeMap.getDatanodeByHost(
           clientMachine);
       DFSUtil.StaleComparator comparator = null;
-      if (checkForStaleNodes) {
+      if (checkForStaleDataNodes) {
         comparator = new DFSUtil.StaleComparator(staleInterval);
       }
       // Note: the last block is also included and sorted
       for (LocatedBlock b : blocks.getLocatedBlocks()) {
         clusterMap.pseudoSortByDistance(client, b.getLocations());
-        if (checkForStaleNodes) {
+        if (checkForStaleDataNodes) {
           Arrays.sort(b.getLocations(), comparator);
         }
       }
@@ -2698,7 +2773,7 @@ public class FSNamesystem implements FSC
             (now() - heartbeatExpireInterval));
   }
     
-  private void setDatanodeDead(DatanodeDescriptor node) throws IOException {
+  private void setDatanodeDead(DatanodeDescriptor node) {
     node.setLastUpdate(0);
   }
 
@@ -3490,17 +3565,40 @@ public class FSNamesystem implements FSC
     boolean allAlive = false;
     while (!allAlive) {
       boolean foundDead = false;
-      DatanodeID nodeID = null;
-
-      // locate the first dead node.
-      synchronized(heartbeats) {
-        for (Iterator<DatanodeDescriptor> it = heartbeats.iterator();
-             it.hasNext();) {
+      DatanodeID dead = null;
+      // check the number of stale nodes
+      int numOfStaleNodes = 0;
+
+      // locate the first dead node. If need to check stale nodes,
+      // also count the number of stale nodes
+      synchronized (heartbeats) {
+        for (Iterator<DatanodeDescriptor> it = heartbeats.iterator(); it
+            .hasNext();) {
           DatanodeDescriptor nodeInfo = it.next();
-          if (isDatanodeDead(nodeInfo)) {
+          if (dead == null && isDatanodeDead(nodeInfo)) {
             foundDead = true;
-            nodeID = nodeInfo;
-            break;
+            dead = nodeInfo;
+            if (!this.checkForStaleDataNodes) {
+              break;
+            }
+          }
+          if (this.checkForStaleDataNodes
+              && nodeInfo.isStale(this.staleInterval)) {
+            numOfStaleNodes++;
+          }
+        }
+
+        // Change whether to avoid using stale datanodes for writing
+        // based on proportion of stale datanodes
+        if (this.checkForStaleDataNodes) {
+          this.numStaleNodes = numOfStaleNodes;
+          if (numOfStaleNodes > heartbeats.size()
+              * this.ratioUseStaleDataNodesForWrite) {
+            this.avoidStaleDataNodesForWrite = false;
+          } else {
+            if (this.initialAvoidWriteStaleNodes) {
+              this.avoidStaleDataNodesForWrite = true;
+            }
           }
         }
       }
@@ -3512,7 +3610,7 @@ public class FSNamesystem implements FSC
             synchronized (datanodeMap) {
               DatanodeDescriptor nodeInfo = null;
               try {
-                nodeInfo = getDatanode(nodeID);
+                nodeInfo = getDatanode(dead);
               } catch (IOException e) {
                 nodeInfo = null;
               }
@@ -6121,4 +6219,41 @@ public class FSNamesystem implements FSC
   public String toString() {
     return getClass().getSimpleName() + ": " + host2DataNodeMap;
   }
+    
+  /**
+   * @return Return the current number of stale DataNodes (detected by
+   *         HeartbeatMonitor).
+   */
+  public int getNumStaleNodes() {
+    return this.numStaleNodes;
+  }
+
+  /**
+   * @return whether or not to avoid writing to stale datanodes
+   */
+  @Override // FSClusterStats
+  public boolean isAvoidingStaleDataNodesForWrite() {
+    return avoidStaleDataNodesForWrite;
+  }
+
+  /**
+   * @return The interval used to judge whether or not a DataNode is stale
+   */
+  public long getStaleInterval() {
+    return this.staleInterval;
+  }
+
+  /**
+   * Set the value of {@link DatanodeManager#avoidStaleDataNodesForWrite}. The
+   * HeartbeatManager disable avoidStaleDataNodesForWrite when more than half of
+   * the DataNodes are marked as stale.
+   * 
+   * @param avoidStaleDataNodesForWrite
+   *          The value to set to
+   *          {@link DatanodeManager#avoidStaleDataNodesForWrite}
+   */
+  void setAvoidStaleDataNodesForWrite(boolean avoidStaleDataNodesForWrite) {
+    this.avoidStaleDataNodesForWrite = avoidStaleDataNodesForWrite;
+  }
+  
 }

Modified: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestGetBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestGetBlocks.java?rev=1399076&r1=1399075&r2=1399076&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestGetBlocks.java (original)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestGetBlocks.java Wed Oct 17 02:08:23 2012
@@ -46,7 +46,6 @@ import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.junit.Test;
 /**
  * This class tests if block replacement request to data nodes work correctly.
  */
@@ -68,11 +67,11 @@ public class TestGetBlocks extends TestC
   public void testReadSelectNonStaleDatanode() throws Exception {
     Configuration conf = new Configuration();
     conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_CHECK_STALE_DATANODE_KEY, true);
-    long staleInterval = 30 * 1000 * 60;
     // DataNode will send out heartbeat every 15 minutes
     // In this way, when we have set a datanode as stale,
     // its heartbeat will not come to refresh its state
     long heartbeatInterval = 15 * 60;
+    long staleInterval = 3 * heartbeatInterval * 1000;
     conf.setLong(DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY,
         staleInterval);
     conf.setLong("dfs.heartbeat.interval", heartbeatInterval);

Modified: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/server/namenode/TestReplicationPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/server/namenode/TestReplicationPolicy.java?rev=1399076&r1=1399075&r2=1399076&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/server/namenode/TestReplicationPolicy.java (original)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/server/namenode/TestReplicationPolicy.java Wed Oct 17 02:08:23 2012
@@ -29,8 +29,10 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.Node;
 
@@ -51,6 +53,9 @@ public class TestReplicationPolicy exten
       new DatanodeDescriptor(new DatanodeID("h5:5020"), "/d2/r3"),
       new DatanodeDescriptor(new DatanodeID("h6:5020"), "/d2/r3")
     };
+  // The interval for marking a datanode as stale,
+  private static final long staleInterval = 
+      DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT;
    
   private final static DatanodeDescriptor NODE = 
     new DatanodeDescriptor(new DatanodeID("h7:5020"), "/d2/r4");
@@ -321,6 +326,91 @@ public class TestReplicationPolicy exten
     assertFalse(cluster.isOnSameRack(targets[0], targets[1]));    
   }
   
+  private boolean containsWithinRange(DatanodeDescriptor target,
+      DatanodeDescriptor[] nodes, int startIndex, int endIndex) {
+    assert startIndex >= 0 && startIndex < nodes.length;
+    assert endIndex >= startIndex && endIndex < nodes.length;
+    for (int i = startIndex; i <= endIndex; i++) {
+      if (nodes[i].equals(target)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  public void testChooseTargetWithStaleNodes() throws Exception {
+    // Enable avoiding writing to stale DataNodes
+    namenode.getNamesystem().setAvoidStaleDataNodesForWrite(true);
+    // Set dataNodes[0] as stale
+    dataNodes[0].setLastUpdate(System.currentTimeMillis() - staleInterval - 1);
+
+    DatanodeDescriptor[] targets;
+    // We set the dataNodes[0] as stale, thus should choose dataNodes[1] since
+    // dataNodes[1] is on the same rack with dataNodes[0] (writer)
+    targets = replicator.chooseTarget(filename, 1, dataNodes[0], BLOCK_SIZE);
+    assertEquals(targets.length, 1);
+    assertEquals(targets[0], dataNodes[1]);
+
+    HashMap<Node, Node> excludedNodes = new HashMap<Node, Node>();
+    excludedNodes.put(dataNodes[1], dataNodes[1]);
+    List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
+    targets = replicator.chooseTarget(filename, 1, dataNodes[0], chosenNodes,
+        excludedNodes, BLOCK_SIZE);
+    assertEquals(targets.length, 1);
+    assertFalse(cluster.isOnSameRack(targets[0], dataNodes[0]));
+
+    // reset
+    namenode.getNamesystem().setAvoidStaleDataNodesForWrite(false);
+    dataNodes[0].setLastUpdate(System.currentTimeMillis());
+  }
+
+  /**
+   * In this testcase, we set 3 nodes (dataNodes[0] ~ dataNodes[2]) as stale,
+   * and when the number of replicas is less or equal to 3, all the healthy
+   * datanodes should be returned by the chooseTarget method. When the number of
+   * replicas is 4, a stale node should be included.
+   * 
+   * @throws Exception
+   */
+  public void testChooseTargetWithHalfStaleNodes() throws Exception {
+    // Enable stale datanodes checking
+    namenode.getNamesystem().setAvoidStaleDataNodesForWrite(true);
+    // Set dataNodes[0], dataNodes[1], and dataNodes[2] as stale
+    for (int i = 0; i < 3; i++) {
+      dataNodes[i]
+          .setLastUpdate(System.currentTimeMillis() - staleInterval - 1);
+    }
+
+    DatanodeDescriptor[] targets;
+    // We set the datanode[0~2] as stale, thus should not choose them
+    targets = replicator.chooseTarget(filename, 1, dataNodes[0], BLOCK_SIZE);
+    assertEquals(targets.length, 1);
+    assertFalse(containsWithinRange(targets[0], dataNodes, 0, 2));
+
+    targets = replicator.chooseTarget(filename, 2, dataNodes[0], BLOCK_SIZE);
+    assertEquals(targets.length, 2);
+    assertFalse(containsWithinRange(targets[0], dataNodes, 0, 2));
+    assertFalse(containsWithinRange(targets[1], dataNodes, 0, 2));
+
+    targets = replicator.chooseTarget(filename, 3, dataNodes[0], BLOCK_SIZE);
+    assertEquals(targets.length, 3);
+    assertTrue(containsWithinRange(targets[0], dataNodes, 3, 5));
+    assertTrue(containsWithinRange(targets[1], dataNodes, 3, 5));
+    assertTrue(containsWithinRange(targets[2], dataNodes, 3, 5));
+
+    targets = replicator.chooseTarget(filename, 4, dataNodes[0], BLOCK_SIZE);
+    assertEquals(targets.length, 4);
+    assertTrue(containsWithinRange(dataNodes[3], targets, 0, 3));
+    assertTrue(containsWithinRange(dataNodes[4], targets, 0, 3));
+    assertTrue(containsWithinRange(dataNodes[5], targets, 0, 3));
+
+    // reset
+    namenode.getNamesystem().setAvoidStaleDataNodesForWrite(false);
+    for (int i = 0; i < dataNodes.length; i++) {
+      dataNodes[i].setLastUpdate(System.currentTimeMillis());
+    }
+  }   
+  
   /**
    * This testcase tests re-replication, when dataNodes[0] is already chosen.
    * So the 1st replica can be placed on random rack. 
@@ -424,6 +514,104 @@ public class TestReplicationPolicy exten
     assertTrue(cluster.isOnSameRack(dataNodes[2], targets[0]));
   }
   
+  public void testChooseTargetWithMoreThanHalfStaleNodes() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_CHECK_STALE_DATANODE_KEY, true);
+    conf.setBoolean(
+        DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY, true);
+    // DataNode will send out heartbeat every 15 minutes
+    // In this way, when we have set a datanode as stale,
+    // its heartbeat will not come to refresh its state
+    long heartbeatInterval = 15 * 60;
+    conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, heartbeatInterval);
+    // Because the stale interval must be at least 3 times of heartbeatInterval,
+    // we reset the staleInterval value.
+    long longStaleInterval = 3 * heartbeatInterval * 1000;
+    conf.setLong(DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY,
+        longStaleInterval);
+
+    String[] hosts = new String[] { "host1", "host2", "host3", "host4",
+        "host5", "host6" };
+    String[] racks = new String[] { "/d1/r1", "/d1/r1", "/d1/r2", "/d1/r2",
+        "/d2/r3", "/d2/r3" };
+    MiniDFSCluster miniCluster = new MiniDFSCluster(conf, hosts.length, true,
+        racks, hosts);
+    miniCluster.waitActive();
+
+    try {
+      // Step 1. Make two datanodes as stale, check whether the
+      // avoidStaleDataNodesForWrite calculation is correct.
+      // First stop the heartbeat of host1 and host2
+      for (int i = 0; i < 2; i++) {
+        DataNode dn = miniCluster.getDataNodes().get(i);
+        miniCluster.getNameNode().getNamesystem()
+            .getDatanode(dn.dnRegistration)
+            .setLastUpdate(System.currentTimeMillis() - longStaleInterval - 1);
+      }
+      // Instead of waiting, explicitly call heartbeatCheck to
+      // let heartbeat manager to detect stale nodes
+      miniCluster.getNameNode().getNamesystem().heartbeatCheck();
+      int numStaleNodes = miniCluster.getNameNode().getNamesystem()
+          .getNumStaleNodes();
+      assertEquals(numStaleNodes, 2);
+      assertTrue(miniCluster.getNameNode().getNamesystem()
+          .isAvoidingStaleDataNodesForWrite());
+      // Call chooseTarget
+      DatanodeDescriptor staleNodeInfo = miniCluster.getNameNode()
+          .getNamesystem()
+          .getDatanode(miniCluster.getDataNodes().get(0).dnRegistration);
+      BlockPlacementPolicy replicator = miniCluster.getNameNode()
+          .getNamesystem().replicator;
+      DatanodeDescriptor[] targets = replicator.chooseTarget(filename, 3,
+          staleNodeInfo, BLOCK_SIZE);
+      assertEquals(targets.length, 3);
+      assertFalse(cluster.isOnSameRack(targets[0], staleNodeInfo));
+
+      // Step 2. Set more than half of the datanodes as stale
+      for (int i = 0; i < 4; i++) {
+        DataNode dn = miniCluster.getDataNodes().get(i);
+        miniCluster.getNameNode().getNamesystem()
+            .getDatanode(dn.dnRegistration)
+            .setLastUpdate(System.currentTimeMillis() - longStaleInterval - 1);
+      }
+      // Explicitly call heartbeatCheck
+      miniCluster.getNameNode().getNamesystem().heartbeatCheck();
+      numStaleNodes = miniCluster.getNameNode().getNamesystem()
+          .getNumStaleNodes();
+      assertEquals(numStaleNodes, 4);
+      // According to our strategy, stale datanodes will be included for writing
+      // to avoid hotspots
+      assertFalse(miniCluster.getNameNode().getNamesystem()
+          .isAvoidingStaleDataNodesForWrite());
+      // Call chooseTarget
+      targets = replicator.chooseTarget(filename, 3, staleNodeInfo, BLOCK_SIZE);
+      assertEquals(targets.length, 3);
+      assertTrue(cluster.isOnSameRack(targets[0], staleNodeInfo));
+
+      // Step 3. Set 2 stale datanodes back to healthy nodes,
+      // still have 2 stale nodes
+      for (int i = 2; i < 4; i++) {
+        DataNode dn = miniCluster.getDataNodes().get(i);
+        miniCluster.getNameNode().getNamesystem()
+            .getDatanode(dn.dnRegistration)
+            .setLastUpdate(System.currentTimeMillis());
+      }
+      // Explicitly call heartbeatCheck
+      miniCluster.getNameNode().getNamesystem().heartbeatCheck();
+      numStaleNodes = miniCluster.getNameNode().getNamesystem()
+          .getNumStaleNodes();
+      assertEquals(numStaleNodes, 2);
+      assertTrue(miniCluster.getNameNode().getNamesystem()
+          .isAvoidingStaleDataNodesForWrite());
+      // Call chooseTarget
+      targets = replicator.chooseTarget(filename, 3, staleNodeInfo, BLOCK_SIZE);
+      assertEquals(targets.length, 3);
+      assertFalse(cluster.isOnSameRack(targets[0], staleNodeInfo));
+    } finally {
+      miniCluster.shutdown();
+    }
+  }
+  
   /**
    * This testcase tests whether the value returned by 
    * DFSUtil.getInvalidateWorkPctPerIteration() is positive