You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cn...@apache.org on 2013/07/11 22:02:40 UTC

svn commit: r1502341 - in /hadoop/common/branches/branch-1-win: CHANGES.branch-1-win.txt src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java

Author: cnauroth
Date: Thu Jul 11 20:02:40 2013
New Revision: 1502341

URL: http://svn.apache.org/r1502341
Log:
HDFS-4975. Branch-1-win TestReplicationPolicy failed caused by stale data node handling. Contributed by Xi Fang.

Modified:
    hadoop/common/branches/branch-1-win/CHANGES.branch-1-win.txt
    hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java

Modified: hadoop/common/branches/branch-1-win/CHANGES.branch-1-win.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/CHANGES.branch-1-win.txt?rev=1502341&r1=1502340&r2=1502341&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/CHANGES.branch-1-win.txt (original)
+++ hadoop/common/branches/branch-1-win/CHANGES.branch-1-win.txt Thu Jul 11 20:02:40 2013
@@ -310,6 +310,9 @@ Branch-hadoop-1-win (branched from branc
     HADOOP-9722. Branch-1-win TestNativeIO failed caused by Window incompatible
     test case. (Xi Fang via cnauroth)
 
+    HDFS-4975. Branch-1-win TestReplicationPolicy failed caused by stale data
+    node handling. (Xi Fang via cnauroth)
+
   Merged from branch-1
 
     HDFS-385. Backport: Add support for an experimental API that allows a

Modified: hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java?rev=1502341&r1=1502340&r2=1502341&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java (original)
+++ hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java Thu Jul 11 20:02:40 2013
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.na
 import org.apache.commons.logging.*;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
@@ -42,6 +43,7 @@ public class BlockPlacementPolicyDefault
   protected boolean considerLoad; 
   protected NetworkTopology clusterMap;
   protected FSClusterStats stats;
+  private long staleInterval;   // interval used to identify stale DataNodes
 
   BlockPlacementPolicyDefault(Configuration conf,  FSClusterStats stats,
                            NetworkTopology clusterMap) {
@@ -54,9 +56,13 @@ public class BlockPlacementPolicyDefault
   /** {@inheritDoc} */
   public void initialize(Configuration conf,  FSClusterStats stats,
                          NetworkTopology clusterMap) {
-    this.considerLoad = conf.getBoolean("dfs.replication.considerLoad", true);
+    this.considerLoad = conf.getBoolean(
+            DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, true);
     this.stats = stats;
     this.clusterMap = clusterMap;
+    this.staleInterval = conf.getLong(
+            DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY,
+            DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT);
   }
 
   /** {@inheritDoc} */
@@ -126,9 +132,10 @@ public class BlockPlacementPolicyDefault
     if (!clusterMap.contains(writer)) {
       writer=null;
     }
-      
+    boolean avoidStaleNodes = (stats != null && stats
+            .shouldAvoidStaleDataNodesForWrite());
     DatanodeDescriptor localNode = chooseTarget(numOfReplicas, writer, 
-                                                excludedNodes, blocksize, maxNodesPerRack, results);
+                                                excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes);
       
     results.removeAll(chosenNodes);
       
@@ -136,36 +143,49 @@ public class BlockPlacementPolicyDefault
     return getPipeline((writer==null)?localNode:writer,
                        results.toArray(new DatanodeDescriptor[results.size()]));
   }
-    
+
+  /* choose <i>numOfReplicas</i> from all data nodes without avoiding stale nodes
+   * This is an old API.*/
+  protected DatanodeDescriptor chooseTarget(int numOfReplicas,
+	      DatanodeDescriptor writer, HashMap<Node, Node> excludedNodes,
+	      long blocksize, int maxNodesPerRack, List<DatanodeDescriptor> results) {
+    return chooseTarget(numOfReplicas, writer, excludedNodes,
+                        blocksize, maxNodesPerRack, results, false);
+  }
+
   /* choose <i>numOfReplicas</i> from all data nodes */
   protected DatanodeDescriptor chooseTarget(int numOfReplicas,
                                           DatanodeDescriptor writer,
                                           HashMap<Node, Node> excludedNodes,
                                           long blocksize,
                                           int maxNodesPerRack,
-                                          List<DatanodeDescriptor> results) {
+                                          List<DatanodeDescriptor> results,
+                                          boolean avoidStaleNodes) {
       
     if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) {
       return writer;
     }
+    int totalReplicasExpected = numOfReplicas + results.size();
       
     int numOfResults = results.size();
     boolean newBlock = (numOfResults==0);
     if (writer == null && !newBlock) {
       writer = results.get(0);
     }
-      
+    // Keep a copy of original excludedNodes
+    final HashMap<Node, Node> oldExcludedNodes = avoidStaleNodes ?
+        new HashMap<Node, Node>(excludedNodes) : null;
     try {
       if (numOfResults == 0) {
         writer = chooseLocalNode(writer, excludedNodes, 
-                                 blocksize, maxNodesPerRack, results);
+                                 blocksize, maxNodesPerRack, results, avoidStaleNodes);
         if (--numOfReplicas == 0) {
           return writer;
         }
       }
       if (numOfResults <= 1) {
         chooseRemoteRack(1, results.get(0), excludedNodes, 
-                         blocksize, maxNodesPerRack, results);
+                         blocksize, maxNodesPerRack, results, avoidStaleNodes);
         if (--numOfReplicas == 0) {
           return writer;
         }
@@ -173,27 +193,59 @@ public class BlockPlacementPolicyDefault
       if (numOfResults <= 2) {
         if (clusterMap.isOnSameRack(results.get(0), results.get(1))) {
           chooseRemoteRack(1, results.get(0), excludedNodes,
-                           blocksize, maxNodesPerRack, results);
+                           blocksize, maxNodesPerRack, results, avoidStaleNodes);
         } else if (newBlock){
           chooseLocalRack(results.get(1), excludedNodes, blocksize, 
-                          maxNodesPerRack, results);
+                          maxNodesPerRack, results, avoidStaleNodes);
         } else {
           chooseLocalRack(writer, excludedNodes, blocksize,
-                          maxNodesPerRack, results);
+                          maxNodesPerRack, results, avoidStaleNodes);
         }
         if (--numOfReplicas == 0) {
           return writer;
         }
       }
-      chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, 
-                   blocksize, maxNodesPerRack, results);
+      chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, blocksize,
+                   maxNodesPerRack, results, avoidStaleNodes);
     } catch (NotEnoughReplicasException e) {
       FSNamesystem.LOG.warn("Not able to place enough replicas, still in need of "
-               + numOfReplicas);
+          + (totalReplicasExpected - results.size()) + " to reach "
+          + totalReplicasExpected + "\n"
+          + e.getMessage());
+      if (avoidStaleNodes) {
+        // Retry chooseTarget again, this time not avoiding stale nodes.
+
+        // excludedNodes contains the initial excludedNodes and nodes that were
+        // not chosen because they were stale, decommissioned, etc.
+        // We need to additionally exclude the nodes that were added to the
+        // result list in the successful calls to choose*() above.
+        for (Node node : results) {
+          oldExcludedNodes.put(node, node);
+        }
+        // Set numOfReplicas, since it can get out of sync with the result list
+        // if the NotEnoughReplicasException was thrown in chooseRandom().
+        numOfReplicas = totalReplicasExpected - results.size();
+        return chooseTarget(numOfReplicas, writer, oldExcludedNodes, blocksize,
+            maxNodesPerRack, results, false);
+      }
     }
     return writer;
   }
-    
+
+  /* choose <i>localMachine</i> as the target without avoiding stale nodes.
+   * if <i>localMachine</i> is not available, 
+   * choose a node on the same rack.
+   * This is an old API
+   * @return the chosen node
+   */
+  protected DatanodeDescriptor chooseLocalNode(DatanodeDescriptor localMachine,
+      HashMap<Node, Node> excludedNodes, long blocksize, int maxNodesPerRack,
+      List<DatanodeDescriptor> results)
+    throws NotEnoughReplicasException {
+      return chooseLocalNode(localMachine, excludedNodes, blocksize, 
+                             maxNodesPerRack, results, false);
+  }
+
   /* choose <i>localMachine</i> as the target.
    * if <i>localMachine</i> is not available, 
    * choose a node on the same rack
@@ -201,18 +253,18 @@ public class BlockPlacementPolicyDefault
    */
   protected DatanodeDescriptor chooseLocalNode(DatanodeDescriptor localMachine,
       HashMap<Node, Node> excludedNodes, long blocksize, int maxNodesPerRack,
-      List<DatanodeDescriptor> results)
+      List<DatanodeDescriptor> results, boolean avoidStaleNodes)
     throws NotEnoughReplicasException {
     // if no local machine, randomly choose one node
     if (localMachine == null)
       return chooseRandom(NodeBase.ROOT, excludedNodes, 
-                          blocksize, maxNodesPerRack, results);
+                          blocksize, maxNodesPerRack, results, avoidStaleNodes);
       
     // otherwise try local machine first
     Node oldNode = excludedNodes.put(localMachine, localMachine);
     if (oldNode == null) { // was not in the excluded list
       if (isGoodTarget(localMachine, blocksize,
-                       maxNodesPerRack, false, results)) {
+                       maxNodesPerRack, false, results, avoidStaleNodes)) {
         results.add(localMachine);
         // add localMachine and related nodes to excludedNode
         addToExcludedNodes(localMachine, excludedNodes);
@@ -222,7 +274,7 @@ public class BlockPlacementPolicyDefault
       
     // try a node on local rack
     return chooseLocalRack(localMachine, excludedNodes, 
-                           blocksize, maxNodesPerRack, results);
+                           blocksize, maxNodesPerRack, results, avoidStaleNodes);
   }
   /**
     * Add <i>localMachine</i> and related nodes to <i>excludedNodes</i>
@@ -235,7 +287,23 @@ public class BlockPlacementPolicyDefault
     Node node = excludedNodes.put(localMachine, localMachine);
     return node == null?1:0;
   }
-    
+
+  /* choose one node from the rack that <i>localMachine</i> is on without
+   * avoiding stale nodes.
+   * if no such node is available, choose one node from the rack where
+   * a second replica is on.
+   * if still no such node is available, choose a random node 
+   * in the cluster.
+   * This is an old API
+   * @return the chosen node
+   */
+  protected DatanodeDescriptor chooseLocalRack(DatanodeDescriptor localMachine,
+      HashMap<Node, Node> excludedNodes, long blocksize, int maxNodesPerRack,
+      List<DatanodeDescriptor> results) throws NotEnoughReplicasException {
+    return chooseLocalRack(localMachine, excludedNodes, 
+                           blocksize, maxNodesPerRack, results, false);
+  }
+
   /* choose one node from the rack that <i>localMachine</i> is on.
    * if no such node is available, choose one node from the rack where
    * a second replica is on.
@@ -245,19 +313,19 @@ public class BlockPlacementPolicyDefault
    */
   protected DatanodeDescriptor chooseLocalRack(DatanodeDescriptor localMachine,
       HashMap<Node, Node> excludedNodes, long blocksize, int maxNodesPerRack,
-      List<DatanodeDescriptor> results)
+      List<DatanodeDescriptor> results, boolean avoidStaleNodes)
     throws NotEnoughReplicasException {
     // no local machine, so choose a random machine
     if (localMachine == null) {
       return chooseRandom(NodeBase.ROOT, excludedNodes, 
-                          blocksize, maxNodesPerRack, results);
+                          blocksize, maxNodesPerRack, results, avoidStaleNodes);
     }
       
     // choose one from the local rack
     try {
       return chooseRandom(
                           localMachine.getNetworkLocation(),
-                          excludedNodes, blocksize, maxNodesPerRack, results);
+                          excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes);
     } catch (NotEnoughReplicasException e1) {
       // find the second replica
       DatanodeDescriptor newLocal=null;
@@ -273,20 +341,36 @@ public class BlockPlacementPolicyDefault
         try {
           return chooseRandom(
                               newLocal.getNetworkLocation(),
-                              excludedNodes, blocksize, maxNodesPerRack, results);
+                              excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes);
         } catch(NotEnoughReplicasException e2) {
           //otherwise randomly choose one from the network
           return chooseRandom(NodeBase.ROOT, excludedNodes,
-                              blocksize, maxNodesPerRack, results);
+                              blocksize, maxNodesPerRack, results, avoidStaleNodes);
         }
       } else {
         //otherwise randomly choose one from the network
         return chooseRandom(NodeBase.ROOT, excludedNodes,
-                            blocksize, maxNodesPerRack, results);
+                            blocksize, maxNodesPerRack, results, avoidStaleNodes);
       }
     }
   }
-    
+
+  /* choose <i>numOfReplicas</i> nodes from the racks
+   * that <i>localMachine</i> is NOT on, without avoiding stale nodes.
+   * if not enough nodes are available, choose the remaining ones 
+   * from the local rack
+   * This is an old API.
+   */
+  protected void chooseRemoteRack(int numOfReplicas,
+                                DatanodeDescriptor localMachine,
+                                HashMap<Node, Node> excludedNodes,
+                                long blocksize,
+                                int maxReplicasPerRack,
+                                List<DatanodeDescriptor> results)
+    throws NotEnoughReplicasException {
+        chooseRemoteRack(numOfReplicas, localMachine, excludedNodes, blocksize, 
+                         maxReplicasPerRack, results, false);
+  }
   /* choose <i>numOfReplicas</i> nodes from the racks 
    * that <i>localMachine</i> is NOT on.
    * if not enough nodes are available, choose the remaining ones 
@@ -297,20 +381,52 @@ public class BlockPlacementPolicyDefault
                                 HashMap<Node, Node> excludedNodes,
                                 long blocksize,
                                 int maxReplicasPerRack,
-                                List<DatanodeDescriptor> results)
+                                List<DatanodeDescriptor> results,
+                                boolean avoidStaleNodes)
     throws NotEnoughReplicasException {
     int oldNumOfReplicas = results.size();
     // randomly choose one node from remote racks
     try {
       chooseRandom(numOfReplicas, "~"+localMachine.getNetworkLocation(),
-                   excludedNodes, blocksize, maxReplicasPerRack, results);
+                   excludedNodes, blocksize, maxReplicasPerRack, results,
+                   avoidStaleNodes);
     } catch (NotEnoughReplicasException e) {
       chooseRandom(numOfReplicas-(results.size()-oldNumOfReplicas),
                    localMachine.getNetworkLocation(), excludedNodes, blocksize, 
-                   maxReplicasPerRack, results);
+                   maxReplicasPerRack, results,
+                   avoidStaleNodes);
     }
   }
 
+  /* Randomly choose one target from <i>nodes</i> without avoiding stale nodes.
+   * This is an old API.
+   * @return the chosen node
+   */
+  protected DatanodeDescriptor chooseRandom(String nodes,
+                                          HashMap<Node, Node> excludedNodes,
+                                          long blocksize,
+                                          int maxNodesPerRack,
+                                          List<DatanodeDescriptor> results)
+    throws NotEnoughReplicasException {
+       return chooseRandom(nodes, excludedNodes, blocksize, maxNodesPerRack,
+           results, false);
+  }
+
+  /* Randomly choose <i>numOfReplicas</i> targets from <i>nodes</i>
+   * without avoiding stale nodes.
+   * This is an old API.
+   */
+  protected void chooseRandom(int numOfReplicas,
+                            String nodes,
+                            HashMap<Node, Node> excludedNodes,
+                            long blocksize,
+                            int maxNodesPerRack,
+                            List<DatanodeDescriptor> results)
+    throws NotEnoughReplicasException {
+        chooseRandom(numOfReplicas, nodes, excludedNodes, blocksize, 
+                     maxNodesPerRack, results, false);
+  }
+
   /* Randomly choose one target from <i>nodes</i>.
    * @return the chosen node
    */
@@ -318,7 +434,8 @@ public class BlockPlacementPolicyDefault
                                           HashMap<Node, Node> excludedNodes,
                                           long blocksize,
                                           int maxNodesPerRack,
-                                          List<DatanodeDescriptor> results) 
+                                          List<DatanodeDescriptor> results,
+                                          boolean avoidStaleNodes)
     throws NotEnoughReplicasException {
     int numOfAvailableNodes =
       clusterMap.countNumOfAvailableNodes(nodes, excludedNodes.keySet());
@@ -329,7 +446,8 @@ public class BlockPlacementPolicyDefault
       Node oldNode = excludedNodes.put(chosenNode, chosenNode);
       if (oldNode == null) { // chosendNode was not in the excluded list
         numOfAvailableNodes--;
-        if (isGoodTarget(chosenNode, blocksize, maxNodesPerRack, results)) {
+        if (isGoodTarget(chosenNode, blocksize, maxNodesPerRack, results,
+                avoidStaleNodes)) {
           results.add(chosenNode);
           // add chosenNode and related nodes to excludedNode
           addToExcludedNodes(chosenNode, excludedNodes);
@@ -350,7 +468,8 @@ public class BlockPlacementPolicyDefault
                             HashMap<Node, Node> excludedNodes,
                             long blocksize,
                             int maxNodesPerRack,
-                            List<DatanodeDescriptor> results)
+                            List<DatanodeDescriptor> results,
+                            boolean avoidStaleNodes)
     throws NotEnoughReplicasException {
       
     int numOfAvailableNodes =
@@ -362,7 +481,8 @@ public class BlockPlacementPolicyDefault
       if (oldNode == null) {
         numOfAvailableNodes--;
 
-        if (isGoodTarget(chosenNode, blocksize, maxNodesPerRack, results)) {
+        if (isGoodTarget(chosenNode, blocksize, maxNodesPerRack, results,
+                avoidStaleNodes)) {
           numOfReplicas--;
           results.add(chosenNode);
           // add chosenNode and related nodes to excludedNode
@@ -393,6 +513,18 @@ public class BlockPlacementPolicyDefault
       Node chosenNode) {
     // do nothing
   }
+
+  /* judge if a node is a good target without avoiding stale nodes.
+   * return true if <i>node</i> has enough space, 
+   * does not have too much load, and the rack does not have too many nodes.
+   * This is an old API.
+   */
+  protected boolean isGoodTarget(DatanodeDescriptor node,
+                               long blockSize, int maxTargetPerLoc,
+                               List<DatanodeDescriptor> results) {
+    return isGoodTarget(node, blockSize, maxTargetPerLoc,
+                            this.considerLoad, results, false);
+  }
   
   /* judge if a node is a good target.
    * return true if <i>node</i> has enough space, 
@@ -400,11 +532,36 @@ public class BlockPlacementPolicyDefault
    */
   protected boolean isGoodTarget(DatanodeDescriptor node,
                                long blockSize, int maxTargetPerLoc,
-                               List<DatanodeDescriptor> results) {
+                               List<DatanodeDescriptor> results,
+                               boolean avoidStaleNodes) {
     return isGoodTarget(node, blockSize, maxTargetPerLoc,
-                        this.considerLoad, results);
+                        this.considerLoad, results, avoidStaleNodes);
   }
-   
+
+  /**
+   * Determine if a node is a good target without avoiding stale nodes.
+   * This is an old API.
+   *
+   * @param node The target node
+   * @param blockSize Size of block
+   * @param maxTargetPerLoc Maximum number of targets per rack. The value of
+   *                        this parameter depends on the number of racks in
+   *                        the cluster and total number of replicas for a block
+   * @param considerLoad whether or not to consider load of the target node
+   * @param results A list containing currently chosen nodes. Used to check if
+   *                too many nodes has been chosen in the target rack.
+   * @return Return true if <i>node</i> has enough space,
+   *         does not have too much load,
+   *         and the rack does not have too many nodes.
+   */
+  protected boolean isGoodTarget(DatanodeDescriptor node,
+          long blockSize, int maxTargetPerLoc,
+          boolean considerLoad,
+          List<DatanodeDescriptor> results) {
+    return isGoodTarget(node, blockSize, maxTargetPerLoc,
+                        considerLoad, results, false);
+  }
+
   /**
    * Determine if a node is a good target. 
    * 
@@ -424,7 +581,8 @@ public class BlockPlacementPolicyDefault
   protected boolean isGoodTarget(DatanodeDescriptor node,
                                long blockSize, int maxTargetPerLoc,
                                boolean considerLoad,
-                               List<DatanodeDescriptor> results) {
+                               List<DatanodeDescriptor> results,
+                               boolean avoidStaleNodes) {
     Log logr = FSNamesystem.LOG;
     // check if the node is (being) decommissed
     if (node.isDecommissionInProgress() || node.isDecommissioned()) {
@@ -433,6 +591,14 @@ public class BlockPlacementPolicyDefault
       return false;
     }
 
+    if (avoidStaleNodes) {
+      if (node.isStale(this.staleInterval)) {
+        logr.debug("Node "+NodeBase.getPath(node)+
+            " is not chosen because the node is (being) stale");
+        return false;
+      }
+    }
+
     long remaining = node.getRemaining() - 
                      (node.getBlocksScheduled() * blockSize); 
     // check the remaining capacity of the target machine