You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cn...@apache.org on 2013/07/11 22:02:40 UTC
svn commit: r1502341 - in /hadoop/common/branches/branch-1-win:
CHANGES.branch-1-win.txt
src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java
Author: cnauroth
Date: Thu Jul 11 20:02:40 2013
New Revision: 1502341
URL: http://svn.apache.org/r1502341
Log:
HDFS-4975. Branch-1-win TestReplicationPolicy failed caused by stale data node handling. Contributed by Xi Fang.
Modified:
hadoop/common/branches/branch-1-win/CHANGES.branch-1-win.txt
hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java
Modified: hadoop/common/branches/branch-1-win/CHANGES.branch-1-win.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/CHANGES.branch-1-win.txt?rev=1502341&r1=1502340&r2=1502341&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/CHANGES.branch-1-win.txt (original)
+++ hadoop/common/branches/branch-1-win/CHANGES.branch-1-win.txt Thu Jul 11 20:02:40 2013
@@ -310,6 +310,9 @@ Branch-hadoop-1-win (branched from branc
HADOOP-9722. Branch-1-win TestNativeIO failed caused by Window incompatible
test case. (Xi Fang via cnauroth)
+ HDFS-4975. Branch-1-win TestReplicationPolicy failed caused by stale data
+ node handling. (Xi Fang via cnauroth)
+
Merged from branch-1
HDFS-385. Backport: Add support for an experimental API that allows a
Modified: hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java?rev=1502341&r1=1502340&r2=1502341&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java (original)
+++ hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java Thu Jul 11 20:02:40 2013
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.na
import org.apache.commons.logging.*;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.FSConstants;
@@ -42,6 +43,7 @@ public class BlockPlacementPolicyDefault
protected boolean considerLoad;
protected NetworkTopology clusterMap;
protected FSClusterStats stats;
+ private long staleInterval; // interval used to identify stale DataNodes
BlockPlacementPolicyDefault(Configuration conf, FSClusterStats stats,
NetworkTopology clusterMap) {
@@ -54,9 +56,13 @@ public class BlockPlacementPolicyDefault
/** {@inheritDoc} */
public void initialize(Configuration conf, FSClusterStats stats,
NetworkTopology clusterMap) {
- this.considerLoad = conf.getBoolean("dfs.replication.considerLoad", true);
+ this.considerLoad = conf.getBoolean(
+ DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, true);
this.stats = stats;
this.clusterMap = clusterMap;
+ this.staleInterval = conf.getLong(
+ DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY,
+ DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT);
}
/** {@inheritDoc} */
@@ -126,9 +132,10 @@ public class BlockPlacementPolicyDefault
if (!clusterMap.contains(writer)) {
writer=null;
}
-
+ boolean avoidStaleNodes = (stats != null && stats
+ .shouldAvoidStaleDataNodesForWrite());
DatanodeDescriptor localNode = chooseTarget(numOfReplicas, writer,
- excludedNodes, blocksize, maxNodesPerRack, results);
+ excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes);
results.removeAll(chosenNodes);
@@ -136,36 +143,49 @@ public class BlockPlacementPolicyDefault
return getPipeline((writer==null)?localNode:writer,
results.toArray(new DatanodeDescriptor[results.size()]));
}
-
+
+ /* choose <i>numOfReplicas</i> from all data nodes without avoiding stale nodes
+ * This is an old API.*/
+ protected DatanodeDescriptor chooseTarget(int numOfReplicas,
+ DatanodeDescriptor writer, HashMap<Node, Node> excludedNodes,
+ long blocksize, int maxNodesPerRack, List<DatanodeDescriptor> results) {
+ return chooseTarget(numOfReplicas, writer, excludedNodes,
+ blocksize, maxNodesPerRack, results, false);
+ }
+
/* choose <i>numOfReplicas</i> from all data nodes */
protected DatanodeDescriptor chooseTarget(int numOfReplicas,
DatanodeDescriptor writer,
HashMap<Node, Node> excludedNodes,
long blocksize,
int maxNodesPerRack,
- List<DatanodeDescriptor> results) {
+ List<DatanodeDescriptor> results,
+ boolean avoidStaleNodes) {
if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) {
return writer;
}
+ int totalReplicasExpected = numOfReplicas + results.size();
int numOfResults = results.size();
boolean newBlock = (numOfResults==0);
if (writer == null && !newBlock) {
writer = results.get(0);
}
-
+ // Keep a copy of original excludedNodes
+ final HashMap<Node, Node> oldExcludedNodes = avoidStaleNodes ?
+ new HashMap<Node, Node>(excludedNodes) : null;
try {
if (numOfResults == 0) {
writer = chooseLocalNode(writer, excludedNodes,
- blocksize, maxNodesPerRack, results);
+ blocksize, maxNodesPerRack, results, avoidStaleNodes);
if (--numOfReplicas == 0) {
return writer;
}
}
if (numOfResults <= 1) {
chooseRemoteRack(1, results.get(0), excludedNodes,
- blocksize, maxNodesPerRack, results);
+ blocksize, maxNodesPerRack, results, avoidStaleNodes);
if (--numOfReplicas == 0) {
return writer;
}
@@ -173,27 +193,59 @@ public class BlockPlacementPolicyDefault
if (numOfResults <= 2) {
if (clusterMap.isOnSameRack(results.get(0), results.get(1))) {
chooseRemoteRack(1, results.get(0), excludedNodes,
- blocksize, maxNodesPerRack, results);
+ blocksize, maxNodesPerRack, results, avoidStaleNodes);
} else if (newBlock){
chooseLocalRack(results.get(1), excludedNodes, blocksize,
- maxNodesPerRack, results);
+ maxNodesPerRack, results, avoidStaleNodes);
} else {
chooseLocalRack(writer, excludedNodes, blocksize,
- maxNodesPerRack, results);
+ maxNodesPerRack, results, avoidStaleNodes);
}
if (--numOfReplicas == 0) {
return writer;
}
}
- chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes,
- blocksize, maxNodesPerRack, results);
+ chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, blocksize,
+ maxNodesPerRack, results, avoidStaleNodes);
} catch (NotEnoughReplicasException e) {
FSNamesystem.LOG.warn("Not able to place enough replicas, still in need of "
- + numOfReplicas);
+ + (totalReplicasExpected - results.size()) + " to reach "
+ + totalReplicasExpected + "\n"
+ + e.getMessage());
+ if (avoidStaleNodes) {
+ // Retry chooseTarget again, this time not avoiding stale nodes.
+
+ // excludedNodes contains the initial excludedNodes and nodes that were
+ // not chosen because they were stale, decommissioned, etc.
+ // We need to additionally exclude the nodes that were added to the
+ // result list in the successful calls to choose*() above.
+ for (Node node : results) {
+ oldExcludedNodes.put(node, node);
+ }
+ // Set numOfReplicas, since it can get out of sync with the result list
+ // if the NotEnoughReplicasException was thrown in chooseRandom().
+ numOfReplicas = totalReplicasExpected - results.size();
+ return chooseTarget(numOfReplicas, writer, oldExcludedNodes, blocksize,
+ maxNodesPerRack, results, false);
+ }
}
return writer;
}
-
+
+ /* choose <i>localMachine</i> as the target without avoiding stale nodes.
+ * if <i>localMachine</i> is not available,
+ * choose a node on the same rack.
+ * This is an old API
+ * @return the chosen node
+ */
+ protected DatanodeDescriptor chooseLocalNode(DatanodeDescriptor localMachine,
+ HashMap<Node, Node> excludedNodes, long blocksize, int maxNodesPerRack,
+ List<DatanodeDescriptor> results)
+ throws NotEnoughReplicasException {
+ return chooseLocalNode(localMachine, excludedNodes, blocksize,
+ maxNodesPerRack, results, false);
+ }
+
/* choose <i>localMachine</i> as the target.
* if <i>localMachine</i> is not available,
* choose a node on the same rack
@@ -201,18 +253,18 @@ public class BlockPlacementPolicyDefault
*/
protected DatanodeDescriptor chooseLocalNode(DatanodeDescriptor localMachine,
HashMap<Node, Node> excludedNodes, long blocksize, int maxNodesPerRack,
- List<DatanodeDescriptor> results)
+ List<DatanodeDescriptor> results, boolean avoidStaleNodes)
throws NotEnoughReplicasException {
// if no local machine, randomly choose one node
if (localMachine == null)
return chooseRandom(NodeBase.ROOT, excludedNodes,
- blocksize, maxNodesPerRack, results);
+ blocksize, maxNodesPerRack, results, avoidStaleNodes);
// otherwise try local machine first
Node oldNode = excludedNodes.put(localMachine, localMachine);
if (oldNode == null) { // was not in the excluded list
if (isGoodTarget(localMachine, blocksize,
- maxNodesPerRack, false, results)) {
+ maxNodesPerRack, false, results, avoidStaleNodes)) {
results.add(localMachine);
// add localMachine and related nodes to excludedNode
addToExcludedNodes(localMachine, excludedNodes);
@@ -222,7 +274,7 @@ public class BlockPlacementPolicyDefault
// try a node on local rack
return chooseLocalRack(localMachine, excludedNodes,
- blocksize, maxNodesPerRack, results);
+ blocksize, maxNodesPerRack, results, avoidStaleNodes);
}
/**
* Add <i>localMachine</i> and related nodes to <i>excludedNodes</i>
@@ -235,7 +287,23 @@ public class BlockPlacementPolicyDefault
Node node = excludedNodes.put(localMachine, localMachine);
return node == null?1:0;
}
-
+
+ /* choose one node from the rack that <i>localMachine</i> is on without
+ * avoiding stale nodes.
+ * if no such node is available, choose one node from the rack where
+ * a second replica is on.
+ * if still no such node is available, choose a random node
+ * in the cluster.
+ * This is an old API
+ * @return the chosen node
+ */
+ protected DatanodeDescriptor chooseLocalRack(DatanodeDescriptor localMachine,
+ HashMap<Node, Node> excludedNodes, long blocksize, int maxNodesPerRack,
+ List<DatanodeDescriptor> results) throws NotEnoughReplicasException {
+ return chooseLocalRack(localMachine, excludedNodes,
+ blocksize, maxNodesPerRack, results, false);
+ }
+
/* choose one node from the rack that <i>localMachine</i> is on.
* if no such node is available, choose one node from the rack where
* a second replica is on.
@@ -245,19 +313,19 @@ public class BlockPlacementPolicyDefault
*/
protected DatanodeDescriptor chooseLocalRack(DatanodeDescriptor localMachine,
HashMap<Node, Node> excludedNodes, long blocksize, int maxNodesPerRack,
- List<DatanodeDescriptor> results)
+ List<DatanodeDescriptor> results, boolean avoidStaleNodes)
throws NotEnoughReplicasException {
// no local machine, so choose a random machine
if (localMachine == null) {
return chooseRandom(NodeBase.ROOT, excludedNodes,
- blocksize, maxNodesPerRack, results);
+ blocksize, maxNodesPerRack, results, avoidStaleNodes);
}
// choose one from the local rack
try {
return chooseRandom(
localMachine.getNetworkLocation(),
- excludedNodes, blocksize, maxNodesPerRack, results);
+ excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes);
} catch (NotEnoughReplicasException e1) {
// find the second replica
DatanodeDescriptor newLocal=null;
@@ -273,20 +341,36 @@ public class BlockPlacementPolicyDefault
try {
return chooseRandom(
newLocal.getNetworkLocation(),
- excludedNodes, blocksize, maxNodesPerRack, results);
+ excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes);
} catch(NotEnoughReplicasException e2) {
//otherwise randomly choose one from the network
return chooseRandom(NodeBase.ROOT, excludedNodes,
- blocksize, maxNodesPerRack, results);
+ blocksize, maxNodesPerRack, results, avoidStaleNodes);
}
} else {
//otherwise randomly choose one from the network
return chooseRandom(NodeBase.ROOT, excludedNodes,
- blocksize, maxNodesPerRack, results);
+ blocksize, maxNodesPerRack, results, avoidStaleNodes);
}
}
}
-
+
+ /* choose <i>numOfReplicas</i> nodes from the racks
+ * that <i>localMachine</i> is NOT on, without avoiding stale nodes.
+ * if not enough nodes are available, choose the remaining ones
+ * from the local rack
+ * This is an old API.
+ */
+ protected void chooseRemoteRack(int numOfReplicas,
+ DatanodeDescriptor localMachine,
+ HashMap<Node, Node> excludedNodes,
+ long blocksize,
+ int maxReplicasPerRack,
+ List<DatanodeDescriptor> results)
+ throws NotEnoughReplicasException {
+ chooseRemoteRack(numOfReplicas, localMachine, excludedNodes, blocksize,
+ maxReplicasPerRack, results, false);
+ }
/* choose <i>numOfReplicas</i> nodes from the racks
* that <i>localMachine</i> is NOT on.
* if not enough nodes are available, choose the remaining ones
@@ -297,20 +381,52 @@ public class BlockPlacementPolicyDefault
HashMap<Node, Node> excludedNodes,
long blocksize,
int maxReplicasPerRack,
- List<DatanodeDescriptor> results)
+ List<DatanodeDescriptor> results,
+ boolean avoidStaleNodes)
throws NotEnoughReplicasException {
int oldNumOfReplicas = results.size();
// randomly choose one node from remote racks
try {
chooseRandom(numOfReplicas, "~"+localMachine.getNetworkLocation(),
- excludedNodes, blocksize, maxReplicasPerRack, results);
+ excludedNodes, blocksize, maxReplicasPerRack, results,
+ avoidStaleNodes);
} catch (NotEnoughReplicasException e) {
chooseRandom(numOfReplicas-(results.size()-oldNumOfReplicas),
localMachine.getNetworkLocation(), excludedNodes, blocksize,
- maxReplicasPerRack, results);
+ maxReplicasPerRack, results,
+ avoidStaleNodes);
}
}
+ /* Randomly choose one target from <i>nodes</i> without avoiding stale nodes.
+ * This is an old API.
+ * @return the chosen node
+ */
+ protected DatanodeDescriptor chooseRandom(String nodes,
+ HashMap<Node, Node> excludedNodes,
+ long blocksize,
+ int maxNodesPerRack,
+ List<DatanodeDescriptor> results)
+ throws NotEnoughReplicasException {
+ return chooseRandom(nodes, excludedNodes, blocksize, maxNodesPerRack,
+ results, false);
+ }
+
+ /* Randomly choose <i>numOfReplicas</i> targets from <i>nodes</i>
+ * without avoiding stale nodes.
+ * This is an old API.
+ */
+ protected void chooseRandom(int numOfReplicas,
+ String nodes,
+ HashMap<Node, Node> excludedNodes,
+ long blocksize,
+ int maxNodesPerRack,
+ List<DatanodeDescriptor> results)
+ throws NotEnoughReplicasException {
+ chooseRandom(numOfReplicas, nodes, excludedNodes, blocksize,
+ maxNodesPerRack, results, false);
+ }
+
/* Randomly choose one target from <i>nodes</i>.
* @return the chosen node
*/
@@ -318,7 +434,8 @@ public class BlockPlacementPolicyDefault
HashMap<Node, Node> excludedNodes,
long blocksize,
int maxNodesPerRack,
- List<DatanodeDescriptor> results)
+ List<DatanodeDescriptor> results,
+ boolean avoidStaleNodes)
throws NotEnoughReplicasException {
int numOfAvailableNodes =
clusterMap.countNumOfAvailableNodes(nodes, excludedNodes.keySet());
@@ -329,7 +446,8 @@ public class BlockPlacementPolicyDefault
Node oldNode = excludedNodes.put(chosenNode, chosenNode);
if (oldNode == null) { // chosendNode was not in the excluded list
numOfAvailableNodes--;
- if (isGoodTarget(chosenNode, blocksize, maxNodesPerRack, results)) {
+ if (isGoodTarget(chosenNode, blocksize, maxNodesPerRack, results,
+ avoidStaleNodes)) {
results.add(chosenNode);
// add chosenNode and related nodes to excludedNode
addToExcludedNodes(chosenNode, excludedNodes);
@@ -350,7 +468,8 @@ public class BlockPlacementPolicyDefault
HashMap<Node, Node> excludedNodes,
long blocksize,
int maxNodesPerRack,
- List<DatanodeDescriptor> results)
+ List<DatanodeDescriptor> results,
+ boolean avoidStaleNodes)
throws NotEnoughReplicasException {
int numOfAvailableNodes =
@@ -362,7 +481,8 @@ public class BlockPlacementPolicyDefault
if (oldNode == null) {
numOfAvailableNodes--;
- if (isGoodTarget(chosenNode, blocksize, maxNodesPerRack, results)) {
+ if (isGoodTarget(chosenNode, blocksize, maxNodesPerRack, results,
+ avoidStaleNodes)) {
numOfReplicas--;
results.add(chosenNode);
// add chosenNode and related nodes to excludedNode
@@ -393,6 +513,18 @@ public class BlockPlacementPolicyDefault
Node chosenNode) {
// do nothing
}
+
+ /* judge if a node is a good target without avoiding stale nodes.
+ * return true if <i>node</i> has enough space,
+ * does not have too much load, and the rack does not have too many nodes.
+ * This is an old API.
+ */
+ protected boolean isGoodTarget(DatanodeDescriptor node,
+ long blockSize, int maxTargetPerLoc,
+ List<DatanodeDescriptor> results) {
+ return isGoodTarget(node, blockSize, maxTargetPerLoc,
+ this.considerLoad, results, false);
+ }
/* judge if a node is a good target.
* return true if <i>node</i> has enough space,
@@ -400,11 +532,36 @@ public class BlockPlacementPolicyDefault
*/
protected boolean isGoodTarget(DatanodeDescriptor node,
long blockSize, int maxTargetPerLoc,
- List<DatanodeDescriptor> results) {
+ List<DatanodeDescriptor> results,
+ boolean avoidStaleNodes) {
return isGoodTarget(node, blockSize, maxTargetPerLoc,
- this.considerLoad, results);
+ this.considerLoad, results, avoidStaleNodes);
}
-
+
+ /**
+ * Determine if a node is a good target without avoiding stale nodes.
+ * This is an old API.
+ *
+ * @param node The target node
+ * @param blockSize Size of block
+ * @param maxTargetPerLoc Maximum number of targets per rack. The value of
+ * this parameter depends on the number of racks in
+ * the cluster and total number of replicas for a block
+ * @param considerLoad whether or not to consider load of the target node
+ * @param results A list containing currently chosen nodes. Used to check if
+ * too many nodes has been chosen in the target rack.
+ * @return Return true if <i>node</i> has enough space,
+ * does not have too much load,
+ * and the rack does not have too many nodes.
+ */
+ protected boolean isGoodTarget(DatanodeDescriptor node,
+ long blockSize, int maxTargetPerLoc,
+ boolean considerLoad,
+ List<DatanodeDescriptor> results) {
+ return isGoodTarget(node, blockSize, maxTargetPerLoc,
+ considerLoad, results, false);
+ }
+
/**
* Determine if a node is a good target.
*
@@ -424,7 +581,8 @@ public class BlockPlacementPolicyDefault
protected boolean isGoodTarget(DatanodeDescriptor node,
long blockSize, int maxTargetPerLoc,
boolean considerLoad,
- List<DatanodeDescriptor> results) {
+ List<DatanodeDescriptor> results,
+ boolean avoidStaleNodes) {
Log logr = FSNamesystem.LOG;
// check if the node is (being) decommissed
if (node.isDecommissionInProgress() || node.isDecommissioned()) {
@@ -433,6 +591,14 @@ public class BlockPlacementPolicyDefault
return false;
}
+ if (avoidStaleNodes) {
+ if (node.isStale(this.staleInterval)) {
+ logr.debug("Node "+NodeBase.getPath(node)+
+ " is not chosen because the node is (being) stale");
+ return false;
+ }
+ }
+
long remaining = node.getRemaining() -
(node.getBlocksScheduled() * blockSize);
// check the remaining capacity of the target machine