You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by su...@apache.org on 2012/10/11 20:08:29 UTC
svn commit: r1397211 - in
/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./
src/main/java/org/apache/hadoop/hdfs/
src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/
src/main/java/org/apache/hadoop/hdfs/server/datanode/ src/main/java/or...
Author: suresh
Date: Thu Oct 11 18:08:27 2012
New Revision: 1397211
URL: http://svn.apache.org/viewvc?rev=1397211&view=rev
Log:
HDFS-3912. Detect and avoid stale datanodes for writes. Contributed by Jing Zhao
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1397211&r1=1397210&r2=1397211&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Thu Oct 11 18:08:27 2012
@@ -13,10 +13,6 @@ Trunk (Unreleased)
HDFS-3601. Add BlockPlacementPolicyWithNodeGroup to support block placement
with 4-layer network topology. (Junping Du via szetszwo)
- HDFS-3703. Datanodes are marked stale if heartbeat is not received in
- configured timeout and are selected as the last location to read from.
- (Jing Zhao via suresh)
-
IMPROVEMENTS
HDFS-1620. Rename HdfsConstants -> HdfsServerConstants, FSConstants ->
@@ -238,6 +234,9 @@ Release 2.0.3-alpha - Unreleased
HDFS-2656. Add libwebhdfs, a pure C client based on WebHDFS.
(Jaimin D Jetly and Jing Zhao via szetszwo)
+ HDFS-3912. Detect and avoid stale datanodes for writes.
+ (Jing Zhao via suresh)
+
IMPROVEMENTS
HDFS-3925. Prettify PipelineAck#toString() for printing to a log
@@ -349,6 +348,11 @@ Release 2.0.2-alpha - 2012-09-07
HDFS-2793. Add an admin command to trigger an edit log roll. (todd)
+ HDFS-3703. Datanodes are marked stale if heartbeat is not received in
+ configured timeout and are selected as the last location to read from.
+ (Jing Zhao via suresh)
+
+
IMPROVEMENTS
HDFS-3390. DFSAdmin should print full stack traces of errors when DEBUG
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1397211&r1=1397210&r2=1397211&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Thu Oct 11 18:08:27 2012
@@ -180,9 +180,21 @@ public class DFSConfigKeys extends Commo
// Whether to enable datanode's stale state detection and usage
public static final String DFS_NAMENODE_CHECK_STALE_DATANODE_KEY = "dfs.namenode.check.stale.datanode";
public static final boolean DFS_NAMENODE_CHECK_STALE_DATANODE_DEFAULT = false;
+ // Whether to enable datanode's stale state detection and usage
+ public static final String DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY = "dfs.namenode.avoid.write.stale.datanode";
+ public static final boolean DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_DEFAULT = false;
// The default value of the time interval for marking datanodes as stale
public static final String DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY = "dfs.namenode.stale.datanode.interval";
- public static final long DFS_NAMENODE_STALE_DATANODE_INTERVAL_MILLI_DEFAULT = 30 * 1000; // 30s
+ public static final long DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT = 30 * 1000; // 30s
+ // The stale interval cannot be too small since otherwise this may cause too frequent churn on stale states.
+ // This value uses the times of heartbeat interval to define the minimum value for stale interval.
+ public static final String DFS_NAMENODE_STALE_DATANODE_MINIMUM_INTERVAL_KEY = "dfs.namenode.stale.datanode.minimum.interval";
+ public static final int DFS_NAMENODE_STALE_DATANODE_MINIMUM_INTERVAL_DEFAULT = 3; // i.e. min_interval is 3 * heartbeat_interval = 9s
+
+ // When the number stale datanodes marked as stale reached this certian ratio,
+ // stop avoiding writing to stale nodes so as to prevent causing hotspots.
+ public static final String DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_KEY = "dfs.namenode.write.stale.datanode.ratio";
+ public static final float DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_DEFAULT = 0.5f;
// Replication monitoring related keys
public static final String DFS_NAMENODE_INVALIDATE_WORK_PCT_PER_ITERATION =
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java?rev=1397211&r1=1397210&r2=1397211&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java Thu Oct 11 18:08:27 2012
@@ -62,6 +62,8 @@ public class BlockPlacementPolicyDefault
protected NetworkTopology clusterMap;
private FSClusterStats stats;
protected long heartbeatInterval; // interval for DataNode heartbeats
+ private long staleInterval; // interval used to identify stale DataNodes
+
/**
* A miss of that many heartbeats is tolerated for replica deletion policy.
*/
@@ -78,7 +80,8 @@ public class BlockPlacementPolicyDefault
@Override
public void initialize(Configuration conf, FSClusterStats stats,
NetworkTopology clusterMap) {
- this.considerLoad = conf.getBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, true);
+ this.considerLoad = conf.getBoolean(
+ DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, true);
this.stats = stats;
this.clusterMap = clusterMap;
this.heartbeatInterval = conf.getLong(
@@ -87,6 +90,9 @@ public class BlockPlacementPolicyDefault
this.tolerateHeartbeatMultiplier = conf.getInt(
DFSConfigKeys.DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_KEY,
DFSConfigKeys.DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_DEFAULT);
+ this.staleInterval = conf.getLong(
+ DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY,
+ DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT);
}
protected ThreadLocal<StringBuilder> threadLocalBuilder =
@@ -155,9 +161,10 @@ public class BlockPlacementPolicyDefault
writer=null;
}
- DatanodeDescriptor localNode = chooseTarget(numOfReplicas, writer,
- excludedNodes, blocksize,
- maxNodesPerRack, results);
+ boolean avoidStaleNodes = (stats != null
+ && stats.isAvoidingStaleDataNodesForWrite());
+ DatanodeDescriptor localNode = chooseTarget(numOfReplicas, writer,
+ excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes);
if (!returnChosenNodes) {
results.removeAll(chosenNodes);
}
@@ -173,8 +180,8 @@ public class BlockPlacementPolicyDefault
HashMap<Node, Node> excludedNodes,
long blocksize,
int maxNodesPerRack,
- List<DatanodeDescriptor> results) {
-
+ List<DatanodeDescriptor> results,
+ final boolean avoidStaleNodes) {
if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) {
return writer;
}
@@ -185,18 +192,21 @@ public class BlockPlacementPolicyDefault
if (writer == null && !newBlock) {
writer = results.get(0);
}
-
+
+ // Keep a copy of original excludedNodes
+ final HashMap<Node, Node> oldExcludedNodes = avoidStaleNodes ?
+ new HashMap<Node, Node>(excludedNodes) : null;
try {
if (numOfResults == 0) {
- writer = chooseLocalNode(writer, excludedNodes,
- blocksize, maxNodesPerRack, results);
+ writer = chooseLocalNode(writer, excludedNodes, blocksize,
+ maxNodesPerRack, results, avoidStaleNodes);
if (--numOfReplicas == 0) {
return writer;
}
}
if (numOfResults <= 1) {
- chooseRemoteRack(1, results.get(0), excludedNodes,
- blocksize, maxNodesPerRack, results);
+ chooseRemoteRack(1, results.get(0), excludedNodes, blocksize,
+ maxNodesPerRack, results, avoidStaleNodes);
if (--numOfReplicas == 0) {
return writer;
}
@@ -204,24 +214,36 @@ public class BlockPlacementPolicyDefault
if (numOfResults <= 2) {
if (clusterMap.isOnSameRack(results.get(0), results.get(1))) {
chooseRemoteRack(1, results.get(0), excludedNodes,
- blocksize, maxNodesPerRack, results);
+ blocksize, maxNodesPerRack,
+ results, avoidStaleNodes);
} else if (newBlock){
chooseLocalRack(results.get(1), excludedNodes, blocksize,
- maxNodesPerRack, results);
+ maxNodesPerRack, results, avoidStaleNodes);
} else {
- chooseLocalRack(writer, excludedNodes, blocksize,
- maxNodesPerRack, results);
+ chooseLocalRack(writer, excludedNodes, blocksize, maxNodesPerRack,
+ results, avoidStaleNodes);
}
if (--numOfReplicas == 0) {
return writer;
}
}
- chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes,
- blocksize, maxNodesPerRack, results);
+ chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, blocksize,
+ maxNodesPerRack, results, avoidStaleNodes);
} catch (NotEnoughReplicasException e) {
LOG.warn("Not able to place enough replicas, still in need of "
+ numOfReplicas + " to reach " + totalReplicasExpected + "\n"
+ e.getMessage());
+ if (avoidStaleNodes) {
+ // ecxludedNodes now has - initial excludedNodes, any nodes that were
+ // chosen and nodes that were tried but were not chosen because they
+ // were stale, decommissioned or for any other reason a node is not
+ // chosen for write. Retry again now not avoiding stale node
+ for (Node node : results) {
+ oldExcludedNodes.put(node, node);
+ }
+ return chooseTarget(numOfReplicas, writer, oldExcludedNodes, blocksize,
+ maxNodesPerRack, results, false);
+ }
}
return writer;
}
@@ -236,26 +258,27 @@ public class BlockPlacementPolicyDefault
HashMap<Node, Node> excludedNodes,
long blocksize,
int maxNodesPerRack,
- List<DatanodeDescriptor> results)
+ List<DatanodeDescriptor> results,
+ boolean avoidStaleNodes)
throws NotEnoughReplicasException {
// if no local machine, randomly choose one node
if (localMachine == null)
- return chooseRandom(NodeBase.ROOT, excludedNodes,
- blocksize, maxNodesPerRack, results);
+ return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
+ maxNodesPerRack, results, avoidStaleNodes);
if (preferLocalNode) {
// otherwise try local machine first
Node oldNode = excludedNodes.put(localMachine, localMachine);
if (oldNode == null) { // was not in the excluded list
- if (isGoodTarget(localMachine, blocksize,
- maxNodesPerRack, false, results)) {
+ if (isGoodTarget(localMachine, blocksize, maxNodesPerRack, false,
+ results, avoidStaleNodes)) {
results.add(localMachine);
return localMachine;
}
}
}
// try a node on local rack
- return chooseLocalRack(localMachine, excludedNodes,
- blocksize, maxNodesPerRack, results);
+ return chooseLocalRack(localMachine, excludedNodes, blocksize,
+ maxNodesPerRack, results, avoidStaleNodes);
}
/* choose one node from the rack that <i>localMachine</i> is on.
@@ -270,19 +293,19 @@ public class BlockPlacementPolicyDefault
HashMap<Node, Node> excludedNodes,
long blocksize,
int maxNodesPerRack,
- List<DatanodeDescriptor> results)
+ List<DatanodeDescriptor> results,
+ boolean avoidStaleNodes)
throws NotEnoughReplicasException {
// no local machine, so choose a random machine
if (localMachine == null) {
- return chooseRandom(NodeBase.ROOT, excludedNodes,
- blocksize, maxNodesPerRack, results);
+ return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
+ maxNodesPerRack, results, avoidStaleNodes);
}
// choose one from the local rack
try {
- return chooseRandom(
- localMachine.getNetworkLocation(),
- excludedNodes, blocksize, maxNodesPerRack, results);
+ return chooseRandom(localMachine.getNetworkLocation(), excludedNodes,
+ blocksize, maxNodesPerRack, results, avoidStaleNodes);
} catch (NotEnoughReplicasException e1) {
// find the second replica
DatanodeDescriptor newLocal=null;
@@ -296,18 +319,17 @@ public class BlockPlacementPolicyDefault
}
if (newLocal != null) {
try {
- return chooseRandom(
- newLocal.getNetworkLocation(),
- excludedNodes, blocksize, maxNodesPerRack, results);
+ return chooseRandom(newLocal.getNetworkLocation(), excludedNodes,
+ blocksize, maxNodesPerRack, results, avoidStaleNodes);
} catch(NotEnoughReplicasException e2) {
//otherwise randomly choose one from the network
- return chooseRandom(NodeBase.ROOT, excludedNodes,
- blocksize, maxNodesPerRack, results);
+ return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
+ maxNodesPerRack, results, avoidStaleNodes);
}
} else {
//otherwise randomly choose one from the network
- return chooseRandom(NodeBase.ROOT, excludedNodes,
- blocksize, maxNodesPerRack, results);
+ return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
+ maxNodesPerRack, results, avoidStaleNodes);
}
}
}
@@ -323,17 +345,19 @@ public class BlockPlacementPolicyDefault
HashMap<Node, Node> excludedNodes,
long blocksize,
int maxReplicasPerRack,
- List<DatanodeDescriptor> results)
+ List<DatanodeDescriptor> results,
+ boolean avoidStaleNodes)
throws NotEnoughReplicasException {
int oldNumOfReplicas = results.size();
// randomly choose one node from remote racks
try {
- chooseRandom(numOfReplicas, "~"+localMachine.getNetworkLocation(),
- excludedNodes, blocksize, maxReplicasPerRack, results);
+ chooseRandom(numOfReplicas, "~" + localMachine.getNetworkLocation(),
+ excludedNodes, blocksize, maxReplicasPerRack, results,
+ avoidStaleNodes);
} catch (NotEnoughReplicasException e) {
chooseRandom(numOfReplicas-(results.size()-oldNumOfReplicas),
localMachine.getNetworkLocation(), excludedNodes, blocksize,
- maxReplicasPerRack, results);
+ maxReplicasPerRack, results, avoidStaleNodes);
}
}
@@ -345,7 +369,8 @@ public class BlockPlacementPolicyDefault
HashMap<Node, Node> excludedNodes,
long blocksize,
int maxNodesPerRack,
- List<DatanodeDescriptor> results)
+ List<DatanodeDescriptor> results,
+ boolean avoidStaleNodes)
throws NotEnoughReplicasException {
int numOfAvailableNodes =
clusterMap.countNumOfAvailableNodes(nodes, excludedNodes.keySet());
@@ -363,7 +388,8 @@ public class BlockPlacementPolicyDefault
Node oldNode = excludedNodes.put(chosenNode, chosenNode);
if (oldNode == null) { // chosenNode was not in the excluded list
numOfAvailableNodes--;
- if (isGoodTarget(chosenNode, blocksize, maxNodesPerRack, results)) {
+ if (isGoodTarget(chosenNode, blocksize,
+ maxNodesPerRack, results, avoidStaleNodes)) {
results.add(chosenNode);
adjustExcludedNodes(excludedNodes, chosenNode);
return chosenNode;
@@ -390,7 +416,8 @@ public class BlockPlacementPolicyDefault
HashMap<Node, Node> excludedNodes,
long blocksize,
int maxNodesPerRack,
- List<DatanodeDescriptor> results)
+ List<DatanodeDescriptor> results,
+ boolean avoidStaleNodes)
throws NotEnoughReplicasException {
int numOfAvailableNodes =
@@ -409,7 +436,8 @@ public class BlockPlacementPolicyDefault
if (oldNode == null) {
numOfAvailableNodes--;
- if (isGoodTarget(chosenNode, blocksize, maxNodesPerRack, results)) {
+ if (isGoodTarget(chosenNode, blocksize,
+ maxNodesPerRack, results, avoidStaleNodes)) {
numOfReplicas--;
results.add(chosenNode);
adjustExcludedNodes(excludedNodes, chosenNode);
@@ -451,9 +479,10 @@ public class BlockPlacementPolicyDefault
*/
private boolean isGoodTarget(DatanodeDescriptor node,
long blockSize, int maxTargetPerRack,
- List<DatanodeDescriptor> results) {
- return isGoodTarget(node, blockSize, maxTargetPerRack,
- this.considerLoad, results);
+ List<DatanodeDescriptor> results,
+ boolean avoidStaleNodes) {
+ return isGoodTarget(node, blockSize, maxTargetPerRack, this.considerLoad,
+ results, avoidStaleNodes);
}
/**
@@ -466,7 +495,8 @@ public class BlockPlacementPolicyDefault
* the cluster and total number of replicas for a block
* @param considerLoad whether or not to consider load of the target node
* @param results A list containing currently chosen nodes. Used to check if
- * too many nodes has been chosen in the target rack.
+ * too many nodes has been chosen in the target rack.
+ * @param avoidStaleNodes Whether or not to avoid choosing stale nodes
* @return Return true if <i>node</i> has enough space,
* does not have too much load,
* and the rack does not have too many nodes.
@@ -474,7 +504,8 @@ public class BlockPlacementPolicyDefault
protected boolean isGoodTarget(DatanodeDescriptor node,
long blockSize, int maxTargetPerRack,
boolean considerLoad,
- List<DatanodeDescriptor> results) {
+ List<DatanodeDescriptor> results,
+ boolean avoidStaleNodes) {
// check if the node is (being) decommissed
if (node.isDecommissionInProgress() || node.isDecommissioned()) {
if(LOG.isDebugEnabled()) {
@@ -485,6 +516,17 @@ public class BlockPlacementPolicyDefault
return false;
}
+ if (avoidStaleNodes) {
+ if (node.isStale(this.staleInterval)) {
+ if (LOG.isDebugEnabled()) {
+ threadLocalBuilder.get().append(node.toString()).append(": ")
+ .append("Node ").append(NodeBase.getPath(node))
+ .append(" is not chosen because the node is staled ");
+ }
+ return false;
+ }
+ }
+
long remaining = node.getRemaining() -
(node.getBlocksScheduled() * blockSize);
// check the remaining capacity of the target machine
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java?rev=1397211&r1=1397210&r2=1397211&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java Thu Oct 11 18:08:27 2012
@@ -64,23 +64,20 @@ public class BlockPlacementPolicyWithNod
* @return the chosen node
*/
@Override
- protected DatanodeDescriptor chooseLocalNode(
- DatanodeDescriptor localMachine,
- HashMap<Node, Node> excludedNodes,
- long blocksize,
- int maxNodesPerRack,
- List<DatanodeDescriptor> results)
+ protected DatanodeDescriptor chooseLocalNode(DatanodeDescriptor localMachine,
+ HashMap<Node, Node> excludedNodes, long blocksize, int maxNodesPerRack,
+ List<DatanodeDescriptor> results, boolean avoidStaleNodes)
throws NotEnoughReplicasException {
// if no local machine, randomly choose one node
if (localMachine == null)
return chooseRandom(NodeBase.ROOT, excludedNodes,
- blocksize, maxNodesPerRack, results);
+ blocksize, maxNodesPerRack, results, avoidStaleNodes);
// otherwise try local machine first
Node oldNode = excludedNodes.put(localMachine, localMachine);
if (oldNode == null) { // was not in the excluded list
if (isGoodTarget(localMachine, blocksize,
- maxNodesPerRack, false, results)) {
+ maxNodesPerRack, false, results, avoidStaleNodes)) {
results.add(localMachine);
// Nodes under same nodegroup should be excluded.
addNodeGroupToExcludedNodes(excludedNodes,
@@ -92,13 +89,13 @@ public class BlockPlacementPolicyWithNod
// try a node on local node group
DatanodeDescriptor chosenNode = chooseLocalNodeGroup(
(NetworkTopologyWithNodeGroup)clusterMap, localMachine, excludedNodes,
- blocksize, maxNodesPerRack, results);
+ blocksize, maxNodesPerRack, results, avoidStaleNodes);
if (chosenNode != null) {
return chosenNode;
}
// try a node on local rack
return chooseLocalRack(localMachine, excludedNodes,
- blocksize, maxNodesPerRack, results);
+ blocksize, maxNodesPerRack, results, avoidStaleNodes);
}
@Override
@@ -119,17 +116,15 @@ public class BlockPlacementPolicyWithNod
}
@Override
- protected DatanodeDescriptor chooseLocalRack(
- DatanodeDescriptor localMachine,
- HashMap<Node, Node> excludedNodes,
- long blocksize,
- int maxNodesPerRack,
- List<DatanodeDescriptor> results)
- throws NotEnoughReplicasException {
+ protected DatanodeDescriptor chooseLocalRack(DatanodeDescriptor localMachine,
+ HashMap<Node, Node> excludedNodes, long blocksize, int maxNodesPerRack,
+ List<DatanodeDescriptor> results, boolean avoidStaleNodes)
+ throws NotEnoughReplicasException {
// no local machine, so choose a random machine
if (localMachine == null) {
return chooseRandom(NodeBase.ROOT, excludedNodes,
- blocksize, maxNodesPerRack, results);
+ blocksize, maxNodesPerRack, results,
+ avoidStaleNodes);
}
// choose one from the local rack, but off-nodegroup
@@ -137,7 +132,8 @@ public class BlockPlacementPolicyWithNod
return chooseRandom(NetworkTopology.getFirstHalf(
localMachine.getNetworkLocation()),
excludedNodes, blocksize,
- maxNodesPerRack, results);
+ maxNodesPerRack, results,
+ avoidStaleNodes);
} catch (NotEnoughReplicasException e1) {
// find the second replica
DatanodeDescriptor newLocal=null;
@@ -151,39 +147,39 @@ public class BlockPlacementPolicyWithNod
}
if (newLocal != null) {
try {
- return chooseRandom(clusterMap.getRack(newLocal.getNetworkLocation()),
- excludedNodes, blocksize, maxNodesPerRack, results);
+ return chooseRandom(
+ clusterMap.getRack(newLocal.getNetworkLocation()), excludedNodes,
+ blocksize, maxNodesPerRack, results, avoidStaleNodes);
} catch(NotEnoughReplicasException e2) {
//otherwise randomly choose one from the network
- return chooseRandom(NodeBase.ROOT, excludedNodes,
- blocksize, maxNodesPerRack, results);
+ return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
+ maxNodesPerRack, results, avoidStaleNodes);
}
} else {
//otherwise randomly choose one from the network
- return chooseRandom(NodeBase.ROOT, excludedNodes,
- blocksize, maxNodesPerRack, results);
+ return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
+ maxNodesPerRack, results, avoidStaleNodes);
}
}
}
@Override
protected void chooseRemoteRack(int numOfReplicas,
- DatanodeDescriptor localMachine,
- HashMap<Node, Node> excludedNodes,
- long blocksize,
- int maxReplicasPerRack,
- List<DatanodeDescriptor> results)
- throws NotEnoughReplicasException {
+ DatanodeDescriptor localMachine, HashMap<Node, Node> excludedNodes,
+ long blocksize, int maxReplicasPerRack, List<DatanodeDescriptor> results,
+ boolean avoidStaleNodes) throws NotEnoughReplicasException {
int oldNumOfReplicas = results.size();
// randomly choose one node from remote racks
try {
- chooseRandom(numOfReplicas, "~"+NetworkTopology.getFirstHalf(
- localMachine.getNetworkLocation()),
- excludedNodes, blocksize, maxReplicasPerRack, results);
+ chooseRandom(
+ numOfReplicas,
+ "~" + NetworkTopology.getFirstHalf(localMachine.getNetworkLocation()),
+ excludedNodes, blocksize, maxReplicasPerRack, results,
+ avoidStaleNodes);
} catch (NotEnoughReplicasException e) {
- chooseRandom(numOfReplicas-(results.size()-oldNumOfReplicas),
- localMachine.getNetworkLocation(), excludedNodes, blocksize,
- maxReplicasPerRack, results);
+ chooseRandom(numOfReplicas - (results.size() - oldNumOfReplicas),
+ localMachine.getNetworkLocation(), excludedNodes, blocksize,
+ maxReplicasPerRack, results, avoidStaleNodes);
}
}
@@ -193,19 +189,22 @@ public class BlockPlacementPolicyWithNod
* if still no such node is available, choose a random node in the cluster.
* @return the chosen node
*/
- private DatanodeDescriptor chooseLocalNodeGroup(NetworkTopologyWithNodeGroup clusterMap,
- DatanodeDescriptor localMachine, HashMap<Node, Node> excludedNodes, long blocksize,
- int maxNodesPerRack, List<DatanodeDescriptor> results) throws NotEnoughReplicasException {
+ private DatanodeDescriptor chooseLocalNodeGroup(
+ NetworkTopologyWithNodeGroup clusterMap, DatanodeDescriptor localMachine,
+ HashMap<Node, Node> excludedNodes, long blocksize, int maxNodesPerRack,
+ List<DatanodeDescriptor> results, boolean avoidStaleNodes)
+ throws NotEnoughReplicasException {
// no local machine, so choose a random machine
if (localMachine == null) {
return chooseRandom(NodeBase.ROOT, excludedNodes,
- blocksize, maxNodesPerRack, results);
+ blocksize, maxNodesPerRack, results, avoidStaleNodes);
}
// choose one from the local node group
try {
- return chooseRandom(clusterMap.getNodeGroup(localMachine.getNetworkLocation()),
- excludedNodes, blocksize, maxNodesPerRack, results);
+ return chooseRandom(
+ clusterMap.getNodeGroup(localMachine.getNetworkLocation()),
+ excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes);
} catch (NotEnoughReplicasException e1) {
// find the second replica
DatanodeDescriptor newLocal=null;
@@ -219,17 +218,19 @@ public class BlockPlacementPolicyWithNod
}
if (newLocal != null) {
try {
- return chooseRandom(clusterMap.getNodeGroup(newLocal.getNetworkLocation()),
- excludedNodes, blocksize, maxNodesPerRack, results);
+ return chooseRandom(
+ clusterMap.getNodeGroup(newLocal.getNetworkLocation()),
+ excludedNodes, blocksize, maxNodesPerRack, results,
+ avoidStaleNodes);
} catch(NotEnoughReplicasException e2) {
//otherwise randomly choose one from the network
- return chooseRandom(NodeBase.ROOT, excludedNodes,
- blocksize, maxNodesPerRack, results);
+ return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
+ maxNodesPerRack, results, avoidStaleNodes);
}
} else {
//otherwise randomly choose one from the network
- return chooseRandom(NodeBase.ROOT, excludedNodes,
- blocksize, maxNodesPerRack, results);
+ return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
+ maxNodesPerRack, results, avoidStaleNodes);
}
}
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java?rev=1397211&r1=1397210&r2=1397211&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java Thu Oct 11 18:08:27 2012
@@ -76,6 +76,7 @@ import org.apache.hadoop.util.Reflection
import org.apache.hadoop.util.Time;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import com.google.common.net.InetAddresses;
/**
@@ -88,8 +89,8 @@ public class DatanodeManager {
private final Namesystem namesystem;
private final BlockManager blockManager;
-
private final HeartbeatManager heartbeatManager;
+ private Daemon decommissionthread = null;
/**
* Stores the datanode -> block map.
@@ -127,28 +128,33 @@ public class DatanodeManager {
/** Ask Datanode only up to this many blocks to delete. */
final int blockInvalidateLimit;
+ /** Whether or not to check stale DataNodes for read/write */
+ private final boolean checkForStaleDataNodes;
+
+ /** The interval for judging stale DataNodes for read/write */
+ private final long staleInterval;
+
+ /** Whether or not to avoid using stale DataNodes for writing */
+ private volatile boolean avoidStaleDataNodesForWrite;
+
+ /** The number of stale DataNodes */
+ private volatile int numStaleNodes;
+
/**
* Whether or not this cluster has ever consisted of more than 1 rack,
* according to the NetworkTopology.
*/
private boolean hasClusterEverBeenMultiRack = false;
- /** Whether or not to check the stale datanodes */
- private volatile boolean checkForStaleNodes;
- /** The time interval for detecting stale datanodes */
- private volatile long staleInterval;
-
- DatanodeManager(final BlockManager blockManager,
- final Namesystem namesystem, final Configuration conf
- ) throws IOException {
+ DatanodeManager(final BlockManager blockManager, final Namesystem namesystem,
+ final Configuration conf) throws IOException {
this.namesystem = namesystem;
this.blockManager = blockManager;
Class<? extends NetworkTopology> networkTopologyClass =
conf.getClass(CommonConfigurationKeysPublic.NET_TOPOLOGY_IMPL_KEY,
NetworkTopology.class, NetworkTopology.class);
- networktopology = (NetworkTopology) ReflectionUtils.newInstance(
- networkTopologyClass, conf);
+ networktopology = ReflectionUtils.newInstance(networkTopologyClass, conf);
this.heartbeatManager = new HeartbeatManager(namesystem, blockManager, conf);
@@ -181,25 +187,69 @@ public class DatanodeManager {
DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_KEY, blockInvalidateLimit);
LOG.info(DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_KEY
+ "=" + this.blockInvalidateLimit);
- // set the value of stale interval based on configuration
- this.checkForStaleNodes = conf.getBoolean(
+
+ checkForStaleDataNodes = conf.getBoolean(
DFSConfigKeys.DFS_NAMENODE_CHECK_STALE_DATANODE_KEY,
DFSConfigKeys.DFS_NAMENODE_CHECK_STALE_DATANODE_DEFAULT);
- if (this.checkForStaleNodes) {
- this.staleInterval = conf.getLong(
- DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY,
- DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_MILLI_DEFAULT);
- if (this.staleInterval < DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_MILLI_DEFAULT) {
- LOG.warn("The given interval for marking stale datanode = "
- + this.staleInterval + ", which is smaller than the default value "
- + DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_MILLI_DEFAULT
- + ".");
- }
+
+ staleInterval = getStaleIntervalFromConf(conf, heartbeatExpireInterval);
+ avoidStaleDataNodesForWrite = getAvoidStaleForWriteFromConf(conf,
+ checkForStaleDataNodes);
+ }
+
+ private static long getStaleIntervalFromConf(Configuration conf,
+ long heartbeatExpireInterval) {
+ long staleInterval = conf.getLong(
+ DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY,
+ DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT);
+ Preconditions.checkArgument(staleInterval > 0,
+ DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY +
+ " = '" + staleInterval + "' is invalid. " +
+ "It should be a positive non-zero value.");
+
+ final long heartbeatIntervalSeconds = conf.getLong(
+ DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
+ DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT);
+ // The stale interval value cannot be smaller than
+ // 3 times of heartbeat interval
+ final long minStaleInterval = conf.getInt(
+ DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_MINIMUM_INTERVAL_KEY,
+ DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_MINIMUM_INTERVAL_DEFAULT)
+ * heartbeatIntervalSeconds * 1000;
+ if (staleInterval < minStaleInterval) {
+ LOG.warn("The given interval for marking stale datanode = "
+ + staleInterval + ", which is less than "
+ + DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_MINIMUM_INTERVAL_DEFAULT
+ + " heartbeat intervals. This may cause too frequent changes of "
+ + "stale states of DataNodes since a heartbeat msg may be missing "
+ + "due to temporary short-term failures. Reset stale interval to "
+ + minStaleInterval + ".");
+ staleInterval = minStaleInterval;
+ }
+ if (staleInterval > heartbeatExpireInterval) {
+ LOG.warn("The given interval for marking stale datanode = "
+ + staleInterval + ", which is larger than heartbeat expire interval "
+ + heartbeatExpireInterval + ".");
}
+ return staleInterval;
}
-
- private Daemon decommissionthread = null;
-
+
+ static boolean getAvoidStaleForWriteFromConf(Configuration conf,
+ boolean checkForStale) {
+ boolean avoid = conf.getBoolean(
+ DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY,
+ DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_DEFAULT);
+ boolean avoidStaleDataNodesForWrite = checkForStale && avoid;
+ if (!checkForStale && avoid) {
+ LOG.warn("Cannot set "
+ + DFSConfigKeys.DFS_NAMENODE_CHECK_STALE_DATANODE_KEY
+ + " as false while setting "
+ + DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY
+ + " as true.");
+ }
+ return avoidStaleDataNodesForWrite;
+ }
+
void activate(final Configuration conf) {
final DecommissionManager dm = new DecommissionManager(namesystem, blockManager);
this.decommissionthread = new Daemon(dm.new Monitor(
@@ -253,9 +303,10 @@ public class DatanodeManager {
client = new NodeBase(rName + NodeBase.PATH_SEPARATOR_STR + targethost);
}
- Comparator<DatanodeInfo> comparator = checkForStaleNodes ?
- new DFSUtil.DecomStaleComparator(staleInterval) :
- DFSUtil.DECOM_COMPARATOR;
+ Comparator<DatanodeInfo> comparator = checkForStaleDataNodes ?
+ new DFSUtil.DecomStaleComparator(staleInterval) :
+ DFSUtil.DECOM_COMPARATOR;
+
for (LocatedBlock b : locatedblocks) {
networktopology.pseudoSortByDistance(client, b.getLocations());
// Move decommissioned/stale datanodes to the bottom
@@ -723,7 +774,7 @@ public class DatanodeManager {
* 3. Added to exclude --> start decommission.
* 4. Removed from exclude --> stop decommission.
*/
- private void refreshDatanodes() throws IOException {
+ private void refreshDatanodes() {
for(DatanodeDescriptor node : datanodeMap.values()) {
// Check if not include.
if (!inHostsList(node)) {
@@ -782,7 +833,61 @@ public class DatanodeManager {
namesystem.readUnlock();
}
}
+
+ /* Getter and Setter for stale DataNodes related attributes */
+
+ /**
+ * @return whether or not to avoid writing to stale datanodes
+ */
+ public boolean isAvoidingStaleDataNodesForWrite() {
+ return avoidStaleDataNodesForWrite;
+ }
+ /**
+ * Set the value of {@link DatanodeManager#avoidStaleDataNodesForWrite}.
+ * The HeartbeatManager disable avoidStaleDataNodesForWrite when more than
+ * half of the DataNodes are marked as stale.
+ *
+ * @param avoidStaleDataNodesForWrite
+ * The value to set to
+ * {@link DatanodeManager#avoidStaleDataNodesForWrite}
+ */
+ void setAvoidStaleDataNodesForWrite(boolean avoidStaleDataNodesForWrite) {
+ this.avoidStaleDataNodesForWrite = avoidStaleDataNodesForWrite;
+ }
+
+ /**
+ * @return Whether or not to check stale DataNodes for R/W
+ */
+ boolean isCheckingForStaleDataNodes() {
+ return checkForStaleDataNodes;
+ }
+
+ /**
+ * @return The time interval used to mark DataNodes as stale.
+ */
+ long getStaleInterval() {
+ return staleInterval;
+ }
+
+ /**
+ * Set the number of current stale DataNodes. The HeartbeatManager got this
+ * number based on DataNodes' heartbeats.
+ *
+ * @param numStaleNodes
+ * The number of stale DataNodes to be set.
+ */
+ void setNumStaleNodes(int numStaleNodes) {
+ this.numStaleNodes = numStaleNodes;
+ }
+
+ /**
+ * @return Return the current number of stale DataNodes (detected by
+ * HeartbeatManager).
+ */
+ int getNumStaleNodes() {
+ return this.numStaleNodes;
+ }
/** Fetch live and dead datanodes. */
public void fetchDatanodes(final List<DatanodeDescriptor> live,
@@ -961,7 +1066,7 @@ public class DatanodeManager {
return nodes;
}
- private void setDatanodeDead(DatanodeDescriptor node) throws IOException {
+ private void setDatanodeDead(DatanodeDescriptor node) {
node.setLastUpdate(0);
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java?rev=1397211&r1=1397210&r2=1397211&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java Thu Oct 11 18:08:27 2012
@@ -30,6 +30,8 @@ import org.apache.hadoop.hdfs.server.nam
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.Time;
+import com.google.common.base.Preconditions;
+
/**
* Manage the heartbeats received from datanodes.
* The datanode list and statistics are synchronized
@@ -54,18 +56,48 @@ class HeartbeatManager implements Datano
private final long heartbeatRecheckInterval;
/** Heartbeat monitor thread */
private final Daemon heartbeatThread = new Daemon(new Monitor());
-
+ /**
+ * The initial setting of end user which indicates whether or not to avoid
+ * writing to stale datanodes.
+ */
+ private final boolean initialAvoidWriteStaleNodes;
+ /**
+ * When the ratio of stale datanodes reaches this number, stop avoiding
+ * writing to stale datanodes, i.e., continue using stale nodes for writing.
+ */
+ private final float ratioUseStaleDataNodesForWrite;
+
final Namesystem namesystem;
final BlockManager blockManager;
- HeartbeatManager(final Namesystem namesystem, final BlockManager blockManager,
- final Configuration conf) {
- this.heartbeatRecheckInterval = conf.getInt(
- DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
- DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5 minutes
-
+ HeartbeatManager(final Namesystem namesystem,
+ final BlockManager blockManager, final Configuration conf) {
this.namesystem = namesystem;
this.blockManager = blockManager;
+ boolean checkStaleNodes = conf.getBoolean(
+ DFSConfigKeys.DFS_NAMENODE_CHECK_STALE_DATANODE_KEY,
+ DFSConfigKeys.DFS_NAMENODE_CHECK_STALE_DATANODE_DEFAULT);
+ long recheckInterval = conf.getInt(
+ DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
+ DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5 min
+ long staleInterval = conf.getLong(
+ DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY,
+ DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT);// 30s
+ this.initialAvoidWriteStaleNodes = DatanodeManager
+ .getAvoidStaleForWriteFromConf(conf, checkStaleNodes);
+ this.ratioUseStaleDataNodesForWrite = conf.getFloat(
+ DFSConfigKeys.DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_KEY,
+ DFSConfigKeys.DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_DEFAULT);
+ Preconditions.checkArgument(
+ (ratioUseStaleDataNodesForWrite > 0 &&
+ ratioUseStaleDataNodesForWrite <= 1.0f),
+ DFSConfigKeys.DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_KEY +
+ " = '" + ratioUseStaleDataNodesForWrite + "' is invalid. " +
+ "It should be a positive non-zero float value, not greater than 1.0f.");
+
+ this.heartbeatRecheckInterval = (checkStaleNodes
+ && initialAvoidWriteStaleNodes
+ && staleInterval < recheckInterval) ? staleInterval : recheckInterval;
}
void activate(Configuration conf) {
@@ -210,16 +242,39 @@ class HeartbeatManager implements Datano
if (namesystem.isInSafeMode()) {
return;
}
+ boolean checkStaleNodes = dm.isCheckingForStaleDataNodes();
boolean allAlive = false;
while (!allAlive) {
// locate the first dead node.
DatanodeID dead = null;
+ // check the number of stale nodes
+ int numOfStaleNodes = 0;
synchronized(this) {
for (DatanodeDescriptor d : datanodes) {
- if (dm.isDatanodeDead(d)) {
+ if (dead == null && dm.isDatanodeDead(d)) {
stats.incrExpiredHeartbeats();
dead = d;
- break;
+ if (!checkStaleNodes) {
+ break;
+ }
+ }
+ if (checkStaleNodes &&
+ d.isStale(dm.getStaleInterval())) {
+ numOfStaleNodes++;
+ }
+ }
+
+ // Change whether to avoid using stale datanodes for writing
+ // based on proportion of stale datanodes
+ if (checkStaleNodes) {
+ dm.setNumStaleNodes(numOfStaleNodes);
+ if (numOfStaleNodes >
+ datanodes.size() * ratioUseStaleDataNodesForWrite) {
+ dm.setAvoidStaleDataNodesForWrite(false);
+ } else {
+ if (this.initialAvoidWriteStaleNodes) {
+ dm.setAvoidStaleDataNodesForWrite(true);
+ }
}
}
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1397211&r1=1397210&r2=1397211&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Thu Oct 11 18:08:27 2012
@@ -251,7 +251,7 @@ public class DataNode extends Configured
Daemon dataXceiverServer = null;
ThreadGroup threadGroup = null;
private DNConf dnConf;
- private boolean heartbeatsDisabledForTests = false;
+ private volatile boolean heartbeatsDisabledForTests = false;
private DataStorage storage = null;
private HttpServer infoServer = null;
DataNodeMetrics metrics;
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java?rev=1397211&r1=1397210&r2=1397211&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java Thu Oct 11 18:08:27 2012
@@ -32,8 +32,16 @@ public interface FSClusterStats {
* @return a count of the total number of block transfers and block
* writes that are currently occuring on the cluster.
*/
-
- public int getTotalLoad() ;
+ public int getTotalLoad();
+
+ /**
+ * Indicate whether or not the cluster is now avoiding
+ * to use stale DataNodes for writing.
+ *
+ * @return True if the cluster is currently avoiding using stale DataNodes
+ * for writing targets, and false otherwise.
+ */
+ public boolean isAvoidingStaleDataNodesForWrite();
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1397211&r1=1397210&r2=1397211&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Thu Oct 11 18:08:27 2012
@@ -5539,4 +5539,10 @@ public class FSNamesystem implements Nam
public void setNNResourceChecker(NameNodeResourceChecker nnResourceChecker) {
this.nnResourceChecker = nnResourceChecker;
}
+
+ @Override
+ public boolean isAvoidingStaleDataNodesForWrite() {
+ return this.blockManager.getDatanodeManager()
+ .isAvoidingStaleDataNodesForWrite();
+ }
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml?rev=1397211&r1=1397210&r2=1397211&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml Thu Oct 11 18:08:27 2012
@@ -988,12 +988,28 @@
<name>dfs.namenode.check.stale.datanode</name>
<value>false</value>
<description>
- Indicate whether or not to check "stale" datanodes whose
- heartbeat messages have not been received by the namenode
- for more than a specified time interval. If this configuration
- parameter is set as true, the stale datanodes will be moved to
- the end of the target node list for reading. The writing will
- also try to avoid stale nodes.
+ Indicate whether or not to check "stale" datanodes whose
+ heartbeat messages have not been received by the namenode
+ for more than a specified time interval. If this configuration
+ parameter is set as true, the system will keep track
+ of the number of stale datanodes. The stale datanodes will be
+ moved to the end of the node list returned for reading. See
+ dfs.namenode.avoid.write.stale.datanode for details on how this
+ affects writes.
+ </description>
+</property>
+
+<property>
+ <name>dfs.namenode.avoid.write.stale.datanode</name>
+ <value>false</value>
+ <description>
+ Indicate whether or not to avoid writing to "stale" datanodes whose
+ heartbeat messages have not been received by the namenode
+ for more than a specified time interval. If this configuration
+ parameter and dfs.namenode.check.stale.datanode are both set as true,
+ the writing will avoid using stale datanodes unless a high number
+ of datanodes are marked as stale. See
+ dfs.namenode.write.stale.datanode.ratio for details.
</description>
</property>
@@ -1001,10 +1017,24 @@
<name>dfs.namenode.stale.datanode.interval</name>
<value>30000</value>
<description>
- Default time interval for marking a datanode as "stale", i.e., if
- the namenode has not received heartbeat msg from a datanode for
- more than this time interval, the datanode will be marked and treated
- as "stale" by default.
+ Default time interval for marking a datanode as "stale", i.e., if
+ the namenode has not received heartbeat msg from a datanode for
+ more than this time interval, the datanode will be marked and treated
+ as "stale" by default. The stale interval cannot be too small since
+ otherwise this may cause too frequent change of stale states.
+ We thus set a minimum stale interval value (the default value is 3 times
+ of heartbeat interval) and guarantee that the stale interval cannot be less
+ than the minimum value.
+ </description>
+</property>
+
+<property>
+ <name>dfs.namenode.write.stale.datanode.ratio</name>
+ <value>0.5f</value>
+ <description>
+ When the ratio of number stale datanodes to total datanodes marked
+ is greater than this ratio, stop avoiding writing to stale nodes so
+ as to prevent causing hotspots.
</description>
</property>
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java?rev=1397211&r1=1397210&r2=1397211&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java Thu Oct 11 18:08:27 2012
@@ -38,9 +38,12 @@ import org.apache.hadoop.hdfs.HdfsConfig
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.Node;
+import org.apache.hadoop.util.Time;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
@@ -55,6 +58,9 @@ public class TestReplicationPolicy {
private static BlockPlacementPolicy replicator;
private static final String filename = "/dummyfile.txt";
private static DatanodeDescriptor dataNodes[];
+ // The interval for marking a datanode as stale,
+ private static long staleInterval =
+ DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT;
@Rule
public ExpectedException exception = ExpectedException.none();
@@ -77,6 +83,8 @@ public class TestReplicationPolicy {
"test.build.data", "build/test/data"), "dfs/");
conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
new File(baseDir, "name").getPath());
+ // Enable the checking for stale datanodes in the beginning
+ conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_CHECK_STALE_DATANODE_KEY, true);
DFSTestUtil.formatNameNode(conf);
namenode = new NameNode(conf);
@@ -229,7 +237,7 @@ public class TestReplicationPolicy {
assertEquals(2, targets.length);
//make sure that the chosen node is in the target.
int i = 0;
- for(; i < targets.length && !dataNodes[2].equals(targets[i]); i++);
+ for (; i < targets.length && !dataNodes[2].equals(targets[i]); i++);
assertTrue(i < targets.length);
}
@@ -369,6 +377,202 @@ public class TestReplicationPolicy {
assertTrue(cluster.isOnSameRack(targets[1], targets[2]));
assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
}
+
+ private boolean containsWithinRange(DatanodeDescriptor target,
+ DatanodeDescriptor[] nodes, int startIndex, int endIndex) {
+ assert startIndex >= 0 && startIndex < nodes.length;
+ assert endIndex >= startIndex && endIndex < nodes.length;
+ for (int i = startIndex; i <= endIndex; i++) {
+ if (nodes[i].equals(target)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Test
+ public void testChooseTargetWithStaleNodes() throws Exception {
+ // Enable avoidng writing to stale datanodes
+ namenode.getNamesystem().getBlockManager().getDatanodeManager()
+ .setAvoidStaleDataNodesForWrite(true);
+ // Set dataNodes[0] as stale
+ dataNodes[0].setLastUpdate(Time.now() - staleInterval - 1);
+
+ DatanodeDescriptor[] targets;
+ // We set the datanode[0] as stale, thus should choose datanode[1] since
+ // datanode[1] is on the same rack with datanode[0] (writer)
+ targets = replicator.chooseTarget(filename, 1, dataNodes[0],
+ new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+ assertEquals(targets.length, 1);
+ assertEquals(targets[0], dataNodes[1]);
+
+ HashMap<Node, Node> excludedNodes = new HashMap<Node, Node>();
+ excludedNodes.put(dataNodes[1], dataNodes[1]);
+ List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
+ BlockPlacementPolicyDefault repl = (BlockPlacementPolicyDefault)replicator;
+ targets = chooseTarget(repl, 1, dataNodes[0], chosenNodes, excludedNodes,
+ BLOCK_SIZE);
+ assertEquals(targets.length, 1);
+ assertFalse(cluster.isOnSameRack(targets[0], dataNodes[0]));
+
+ // reset
+ namenode.getNamesystem().getBlockManager().getDatanodeManager()
+ .setAvoidStaleDataNodesForWrite(false);
+ dataNodes[0].setLastUpdate(Time.now());
+ }
+
+ /**
+ * In this testcase, we set 3 nodes (dataNodes[0] ~ dataNodes[2]) as stale,
+ * and when the number of replicas is less or equal to 3, all the healthy
+ * datanodes should be returned by the chooseTarget method. When the number
+ * of replicas is 4, a stale node should be included.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testChooseTargetWithHalfStaleNodes() throws Exception {
+ // Enable stale datanodes checking
+ namenode.getNamesystem().getBlockManager().getDatanodeManager()
+ .setAvoidStaleDataNodesForWrite(true);
+ // Set dataNodes[0], dataNodes[1], and dataNodes[2] as stale
+ for (int i = 0; i < 3; i++) {
+ dataNodes[i].setLastUpdate(Time.now() - staleInterval - 1);
+ }
+
+ DatanodeDescriptor[] targets;
+ targets = replicator.chooseTarget(filename, 0, dataNodes[0],
+ new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+ assertEquals(targets.length, 0);
+
+ // We set the datanode[0] as stale, thus should choose datanode[1]
+ targets = replicator.chooseTarget(filename, 1, dataNodes[0],
+ new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+ assertEquals(targets.length, 1);
+ assertFalse(containsWithinRange(targets[0], dataNodes, 0, 2));
+
+ targets = replicator.chooseTarget(filename, 2, dataNodes[0],
+ new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+ assertEquals(targets.length, 2);
+ assertFalse(containsWithinRange(targets[0], dataNodes, 0, 2));
+ assertFalse(containsWithinRange(targets[1], dataNodes, 0, 2));
+
+ targets = replicator.chooseTarget(filename, 3, dataNodes[0],
+ new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+ assertEquals(targets.length, 3);
+ assertTrue(containsWithinRange(targets[0], dataNodes, 3, 5));
+ assertTrue(containsWithinRange(targets[1], dataNodes, 3, 5));
+ assertTrue(containsWithinRange(targets[2], dataNodes, 3, 5));
+
+ targets = replicator.chooseTarget(filename, 4, dataNodes[0],
+ new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+ assertEquals(targets.length, 4);
+ assertTrue(containsWithinRange(dataNodes[3], targets, 0, 3));
+ assertTrue(containsWithinRange(dataNodes[4], targets, 0, 3));
+ assertTrue(containsWithinRange(dataNodes[5], targets, 0, 3));
+
+ // reset
+ namenode.getNamesystem().getBlockManager().getDatanodeManager()
+ .setAvoidStaleDataNodesForWrite(false);
+ for (int i = 0; i < dataNodes.length; i++) {
+ dataNodes[i].setLastUpdate(Time.now());
+ }
+ }
+
+ @Test
+ public void testChooseTargetWithMoreThanHalfStaleNodes() throws Exception {
+ HdfsConfiguration conf = new HdfsConfiguration();
+ conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_CHECK_STALE_DATANODE_KEY, true);
+ conf.setBoolean(
+ DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY, true);
+ String[] hosts = new String[]{"host1", "host2", "host3",
+ "host4", "host5", "host6"};
+ String[] racks = new String[]{"/d1/r1", "/d1/r1", "/d1/r2",
+ "/d1/r2", "/d2/r3", "/d2/r3"};
+ MiniDFSCluster miniCluster = new MiniDFSCluster.Builder(conf).racks(racks)
+ .hosts(hosts).numDataNodes(hosts.length).build();
+ miniCluster.waitActive();
+
+ try {
+ // Step 1. Make two datanodes as stale, check whether the
+ // avoidStaleDataNodesForWrite calculation is correct.
+ // First stop the heartbeat of host1 and host2
+ for (int i = 0; i < 2; i++) {
+ DataNode dn = miniCluster.getDataNodes().get(i);
+ DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true);
+ miniCluster.getNameNode().getNamesystem().getBlockManager()
+ .getDatanodeManager().getDatanode(dn.getDatanodeId())
+ .setLastUpdate(Time.now() - staleInterval - 1);
+ }
+ // Instead of waiting, explicitly call heartbeatCheck to
+ // let heartbeat manager to detect stale nodes
+ miniCluster.getNameNode().getNamesystem().getBlockManager()
+ .getDatanodeManager().getHeartbeatManager().heartbeatCheck();
+ int numStaleNodes = miniCluster.getNameNode().getNamesystem()
+ .getBlockManager().getDatanodeManager().getNumStaleNodes();
+ assertEquals(numStaleNodes, 2);
+ assertTrue(miniCluster.getNameNode().getNamesystem().getBlockManager()
+ .getDatanodeManager().isAvoidingStaleDataNodesForWrite());
+ // Call chooseTarget
+ DatanodeDescriptor staleNodeInfo = miniCluster.getNameNode()
+ .getNamesystem().getBlockManager().getDatanodeManager()
+ .getDatanode(miniCluster.getDataNodes().get(0).getDatanodeId());
+ BlockPlacementPolicy replicator = miniCluster.getNameNode()
+ .getNamesystem().getBlockManager().getBlockPlacementPolicy();
+ DatanodeDescriptor[] targets = replicator.chooseTarget(filename, 3,
+ staleNodeInfo, new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+ assertEquals(targets.length, 3);
+ assertFalse(cluster.isOnSameRack(targets[0], staleNodeInfo));
+
+ // Step 2. Set more than half of the datanodes as stale
+ for (int i = 0; i < 4; i++) {
+ DataNode dn = miniCluster.getDataNodes().get(i);
+ DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true);
+ miniCluster.getNameNode().getNamesystem().getBlockManager()
+ .getDatanodeManager().getDatanode(dn.getDatanodeId())
+ .setLastUpdate(Time.now() - staleInterval - 1);
+ }
+ // Explicitly call heartbeatCheck
+ miniCluster.getNameNode().getNamesystem().getBlockManager()
+ .getDatanodeManager().getHeartbeatManager().heartbeatCheck();
+ numStaleNodes = miniCluster.getNameNode().getNamesystem()
+ .getBlockManager().getDatanodeManager().getNumStaleNodes();
+ assertEquals(numStaleNodes, 4);
+ // According to our strategy, stale datanodes will be included for writing
+ // to avoid hotspots
+ assertFalse(miniCluster.getNameNode().getNamesystem().getBlockManager()
+ .getDatanodeManager().isAvoidingStaleDataNodesForWrite());
+ // Call chooseTarget
+ targets = replicator.chooseTarget(filename, 3,
+ staleNodeInfo, new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+ assertEquals(targets.length, 3);
+ assertTrue(cluster.isOnSameRack(targets[0], staleNodeInfo));
+
+ // Step 3. Set 2 stale datanodes back to healthy nodes,
+ // still have 2 stale nodes
+ for (int i = 2; i < 4; i++) {
+ DataNode dn = miniCluster.getDataNodes().get(i);
+ DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, false);
+ miniCluster.getNameNode().getNamesystem().getBlockManager()
+ .getDatanodeManager().getDatanode(dn.getDatanodeId())
+ .setLastUpdate(Time.now());
+ }
+ // Explicitly call heartbeatCheck
+ miniCluster.getNameNode().getNamesystem().getBlockManager()
+ .getDatanodeManager().getHeartbeatManager().heartbeatCheck();
+ numStaleNodes = miniCluster.getNameNode().getNamesystem()
+ .getBlockManager().getDatanodeManager().getNumStaleNodes();
+ assertEquals(numStaleNodes, 2);
+ assertTrue(miniCluster.getNameNode().getNamesystem().getBlockManager()
+ .getDatanodeManager().isAvoidingStaleDataNodesForWrite());
+ // Call chooseTarget
+ targets = replicator.chooseTarget(filename, 3,
+ staleNodeInfo, new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+ assertEquals(targets.length, 3);
+ assertFalse(cluster.isOnSameRack(targets[0], staleNodeInfo));
+ } finally {
+ miniCluster.shutdown();
+ }
+ }
/**
* This testcase tests re-replication, when dataNodes[0] is already chosen.
@@ -490,8 +694,8 @@ public class TestReplicationPolicy {
.format(true).build();
try {
cluster.waitActive();
- final UnderReplicatedBlocks neededReplications = (UnderReplicatedBlocks) cluster
- .getNameNode().getNamesystem().getBlockManager().neededReplications;
+ final UnderReplicatedBlocks neededReplications = cluster.getNameNode()
+ .getNamesystem().getBlockManager().neededReplications;
for (int i = 0; i < 100; i++) {
// Adding the blocks directly to normal priority
neededReplications.add(new Block(random.nextLong()), 2, 0, 3);
@@ -529,10 +733,10 @@ public class TestReplicationPolicy {
// Adding QUEUE_VERY_UNDER_REPLICATED block
underReplicatedBlocks.add(new Block(random.nextLong()), 2, 0, 7);
- // Adding QUEUE_UNDER_REPLICATED block
+ // Adding QUEUE_REPLICAS_BADLY_DISTRIBUTED block
underReplicatedBlocks.add(new Block(random.nextLong()), 6, 0, 6);
- // Adding QUEUE_REPLICAS_BADLY_DISTRIBUTED block
+ // Adding QUEUE_UNDER_REPLICATED block
underReplicatedBlocks.add(new Block(random.nextLong()), 5, 0, 6);
// Adding QUEUE_WITH_CORRUPT_BLOCKS block
@@ -618,6 +822,11 @@ public class TestReplicationPolicy {
dataNodes[5].setRemaining(1*1024*1024);
replicaNodeList.add(dataNodes[5]);
+ // Refresh the last update time for all the datanodes
+ for (int i = 0; i < dataNodes.length; i++) {
+ dataNodes[i].setLastUpdate(Time.now());
+ }
+
List<DatanodeDescriptor> first = new ArrayList<DatanodeDescriptor>();
List<DatanodeDescriptor> second = new ArrayList<DatanodeDescriptor>();
replicator.splitNodesWithRack(