You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ss...@apache.org on 2012/10/16 02:03:59 UTC
svn commit: r1398581 [4/9] - in
/hadoop/common/branches/MR-3902/hadoop-hdfs-project: hadoop-hdfs-httpfs/
hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/
hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/
hadoop-hdfs-h...
Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java Tue Oct 16 00:02:55 2012
@@ -71,21 +71,6 @@ public abstract class BlockPlacementPoli
long blocksize);
/**
- * Same as
- * {{@link #chooseTarget(String, int, DatanodeDescriptor, List, boolean, HashMap, long)}
- * with returnChosenNodes equal to false.
- */
- final DatanodeDescriptor[] chooseTarget(String srcPath,
- int numOfReplicas,
- DatanodeDescriptor writer,
- List<DatanodeDescriptor> chosenNodes,
- HashMap<Node, Node> excludedNodes,
- long blocksize) {
- return chooseTarget(srcPath, numOfReplicas, writer, chosenNodes, false,
- excludedNodes, blocksize);
- }
-
- /**
* choose <i>numOfReplicas</i> data nodes for <i>writer</i>
* to re-replicate a block with size <i>blocksize</i>
* If not, return as many as we can.
@@ -131,7 +116,7 @@ public abstract class BlockPlacementPoli
HashMap<Node, Node> excludedNodes,
long blocksize) {
return chooseTarget(srcBC.getName(), numOfReplicas, writer,
- chosenNodes, excludedNodes, blocksize);
+ chosenNodes, false, excludedNodes, blocksize);
}
/**
@@ -198,51 +183,6 @@ public abstract class BlockPlacementPoli
replicator.initialize(conf, stats, clusterMap);
return replicator;
}
-
- /**
- * choose <i>numOfReplicas</i> nodes for <i>writer</i> to replicate
- * a block with size <i>blocksize</i>
- * If not, return as many as we can.
- *
- * @param srcPath a string representation of the file for which chooseTarget is invoked
- * @param numOfReplicas number of replicas wanted.
- * @param writer the writer's machine, null if not in the cluster.
- * @param blocksize size of the data to be written.
- * @return array of DatanodeDescriptor instances chosen as targets
- * and sorted as a pipeline.
- */
- DatanodeDescriptor[] chooseTarget(String srcPath,
- int numOfReplicas,
- DatanodeDescriptor writer,
- long blocksize) {
- return chooseTarget(srcPath, numOfReplicas, writer,
- new ArrayList<DatanodeDescriptor>(),
- blocksize);
- }
-
- /**
- * choose <i>numOfReplicas</i> nodes for <i>writer</i> to replicate
- * a block with size <i>blocksize</i>
- * If not, return as many as we can.
- *
- * @param srcPath a string representation of the file for which chooseTarget is invoked
- * @param numOfReplicas number of replicas wanted.
- * @param writer the writer's machine, null if not in the cluster.
- * @param blocksize size of the data to be written.
- * @param excludedNodes datanodes that should not be considered as targets.
- * @return array of DatanodeDescriptor instances chosen as targets
- * and sorted as a pipeline.
- */
- public DatanodeDescriptor[] chooseTarget(String srcPath,
- int numOfReplicas,
- DatanodeDescriptor writer,
- HashMap<Node, Node> excludedNodes,
- long blocksize) {
- return chooseTarget(srcPath, numOfReplicas, writer,
- new ArrayList<DatanodeDescriptor>(),
- excludedNodes,
- blocksize);
- }
/**
* Adjust rackmap, moreThanOne, and exactlyOne after removing replica on cur.
Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java Tue Oct 16 00:02:55 2012
@@ -27,8 +27,6 @@ import java.util.List;
import java.util.Set;
import java.util.TreeSet;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -55,9 +53,6 @@ import com.google.common.annotations.Vis
@InterfaceAudience.Private
public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
- private static final Log LOG =
- LogFactory.getLog(BlockPlacementPolicyDefault.class.getName());
-
private static final String enableDebugLogging =
"For more information, please enable DEBUG log level on "
+ LOG.getClass().getName();
@@ -67,6 +62,8 @@ public class BlockPlacementPolicyDefault
protected NetworkTopology clusterMap;
private FSClusterStats stats;
protected long heartbeatInterval; // interval for DataNode heartbeats
+ private long staleInterval; // interval used to identify stale DataNodes
+
/**
* A miss of that many heartbeats is tolerated for replica deletion policy.
*/
@@ -83,7 +80,8 @@ public class BlockPlacementPolicyDefault
@Override
public void initialize(Configuration conf, FSClusterStats stats,
NetworkTopology clusterMap) {
- this.considerLoad = conf.getBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, true);
+ this.considerLoad = conf.getBoolean(
+ DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, true);
this.stats = stats;
this.clusterMap = clusterMap;
this.heartbeatInterval = conf.getLong(
@@ -92,6 +90,9 @@ public class BlockPlacementPolicyDefault
this.tolerateHeartbeatMultiplier = conf.getInt(
DFSConfigKeys.DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_KEY,
DFSConfigKeys.DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_DEFAULT);
+ this.staleInterval = conf.getLong(
+ DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY,
+ DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT);
}
protected ThreadLocal<StringBuilder> threadLocalBuilder =
@@ -124,7 +125,6 @@ public class BlockPlacementPolicyDefault
excludedNodes, blocksize);
}
-
/** This is the implementation. */
DatanodeDescriptor[] chooseTarget(int numOfReplicas,
DatanodeDescriptor writer,
@@ -161,8 +161,10 @@ public class BlockPlacementPolicyDefault
writer=null;
}
- DatanodeDescriptor localNode = chooseTarget(numOfReplicas, writer,
- excludedNodes, blocksize, maxNodesPerRack, results);
+ boolean avoidStaleNodes = (stats != null
+ && stats.isAvoidingStaleDataNodesForWrite());
+ DatanodeDescriptor localNode = chooseTarget(numOfReplicas, writer,
+ excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes);
if (!returnChosenNodes) {
results.removeAll(chosenNodes);
}
@@ -178,8 +180,8 @@ public class BlockPlacementPolicyDefault
HashMap<Node, Node> excludedNodes,
long blocksize,
int maxNodesPerRack,
- List<DatanodeDescriptor> results) {
-
+ List<DatanodeDescriptor> results,
+ final boolean avoidStaleNodes) {
if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) {
return writer;
}
@@ -190,18 +192,21 @@ public class BlockPlacementPolicyDefault
if (writer == null && !newBlock) {
writer = results.get(0);
}
-
+
+ // Keep a copy of original excludedNodes
+ final HashMap<Node, Node> oldExcludedNodes = avoidStaleNodes ?
+ new HashMap<Node, Node>(excludedNodes) : null;
try {
if (numOfResults == 0) {
- writer = chooseLocalNode(writer, excludedNodes,
- blocksize, maxNodesPerRack, results);
+ writer = chooseLocalNode(writer, excludedNodes, blocksize,
+ maxNodesPerRack, results, avoidStaleNodes);
if (--numOfReplicas == 0) {
return writer;
}
}
if (numOfResults <= 1) {
- chooseRemoteRack(1, results.get(0), excludedNodes,
- blocksize, maxNodesPerRack, results);
+ chooseRemoteRack(1, results.get(0), excludedNodes, blocksize,
+ maxNodesPerRack, results, avoidStaleNodes);
if (--numOfReplicas == 0) {
return writer;
}
@@ -209,24 +214,36 @@ public class BlockPlacementPolicyDefault
if (numOfResults <= 2) {
if (clusterMap.isOnSameRack(results.get(0), results.get(1))) {
chooseRemoteRack(1, results.get(0), excludedNodes,
- blocksize, maxNodesPerRack, results);
+ blocksize, maxNodesPerRack,
+ results, avoidStaleNodes);
} else if (newBlock){
chooseLocalRack(results.get(1), excludedNodes, blocksize,
- maxNodesPerRack, results);
+ maxNodesPerRack, results, avoidStaleNodes);
} else {
- chooseLocalRack(writer, excludedNodes, blocksize,
- maxNodesPerRack, results);
+ chooseLocalRack(writer, excludedNodes, blocksize, maxNodesPerRack,
+ results, avoidStaleNodes);
}
if (--numOfReplicas == 0) {
return writer;
}
}
- chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes,
- blocksize, maxNodesPerRack, results);
+ chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, blocksize,
+ maxNodesPerRack, results, avoidStaleNodes);
} catch (NotEnoughReplicasException e) {
LOG.warn("Not able to place enough replicas, still in need of "
+ numOfReplicas + " to reach " + totalReplicasExpected + "\n"
+ e.getMessage());
+ if (avoidStaleNodes) {
+ // ecxludedNodes now has - initial excludedNodes, any nodes that were
+ // chosen and nodes that were tried but were not chosen because they
+ // were stale, decommissioned or for any other reason a node is not
+ // chosen for write. Retry again now not avoiding stale node
+ for (Node node : results) {
+ oldExcludedNodes.put(node, node);
+ }
+ return chooseTarget(numOfReplicas, writer, oldExcludedNodes, blocksize,
+ maxNodesPerRack, results, false);
+ }
}
return writer;
}
@@ -241,26 +258,27 @@ public class BlockPlacementPolicyDefault
HashMap<Node, Node> excludedNodes,
long blocksize,
int maxNodesPerRack,
- List<DatanodeDescriptor> results)
+ List<DatanodeDescriptor> results,
+ boolean avoidStaleNodes)
throws NotEnoughReplicasException {
// if no local machine, randomly choose one node
if (localMachine == null)
- return chooseRandom(NodeBase.ROOT, excludedNodes,
- blocksize, maxNodesPerRack, results);
+ return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
+ maxNodesPerRack, results, avoidStaleNodes);
if (preferLocalNode) {
// otherwise try local machine first
Node oldNode = excludedNodes.put(localMachine, localMachine);
if (oldNode == null) { // was not in the excluded list
- if (isGoodTarget(localMachine, blocksize,
- maxNodesPerRack, false, results)) {
+ if (isGoodTarget(localMachine, blocksize, maxNodesPerRack, false,
+ results, avoidStaleNodes)) {
results.add(localMachine);
return localMachine;
}
}
}
// try a node on local rack
- return chooseLocalRack(localMachine, excludedNodes,
- blocksize, maxNodesPerRack, results);
+ return chooseLocalRack(localMachine, excludedNodes, blocksize,
+ maxNodesPerRack, results, avoidStaleNodes);
}
/* choose one node from the rack that <i>localMachine</i> is on.
@@ -275,19 +293,19 @@ public class BlockPlacementPolicyDefault
HashMap<Node, Node> excludedNodes,
long blocksize,
int maxNodesPerRack,
- List<DatanodeDescriptor> results)
+ List<DatanodeDescriptor> results,
+ boolean avoidStaleNodes)
throws NotEnoughReplicasException {
// no local machine, so choose a random machine
if (localMachine == null) {
- return chooseRandom(NodeBase.ROOT, excludedNodes,
- blocksize, maxNodesPerRack, results);
+ return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
+ maxNodesPerRack, results, avoidStaleNodes);
}
// choose one from the local rack
try {
- return chooseRandom(
- localMachine.getNetworkLocation(),
- excludedNodes, blocksize, maxNodesPerRack, results);
+ return chooseRandom(localMachine.getNetworkLocation(), excludedNodes,
+ blocksize, maxNodesPerRack, results, avoidStaleNodes);
} catch (NotEnoughReplicasException e1) {
// find the second replica
DatanodeDescriptor newLocal=null;
@@ -301,18 +319,17 @@ public class BlockPlacementPolicyDefault
}
if (newLocal != null) {
try {
- return chooseRandom(
- newLocal.getNetworkLocation(),
- excludedNodes, blocksize, maxNodesPerRack, results);
+ return chooseRandom(newLocal.getNetworkLocation(), excludedNodes,
+ blocksize, maxNodesPerRack, results, avoidStaleNodes);
} catch(NotEnoughReplicasException e2) {
//otherwise randomly choose one from the network
- return chooseRandom(NodeBase.ROOT, excludedNodes,
- blocksize, maxNodesPerRack, results);
+ return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
+ maxNodesPerRack, results, avoidStaleNodes);
}
} else {
//otherwise randomly choose one from the network
- return chooseRandom(NodeBase.ROOT, excludedNodes,
- blocksize, maxNodesPerRack, results);
+ return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
+ maxNodesPerRack, results, avoidStaleNodes);
}
}
}
@@ -328,17 +345,19 @@ public class BlockPlacementPolicyDefault
HashMap<Node, Node> excludedNodes,
long blocksize,
int maxReplicasPerRack,
- List<DatanodeDescriptor> results)
+ List<DatanodeDescriptor> results,
+ boolean avoidStaleNodes)
throws NotEnoughReplicasException {
int oldNumOfReplicas = results.size();
// randomly choose one node from remote racks
try {
- chooseRandom(numOfReplicas, "~"+localMachine.getNetworkLocation(),
- excludedNodes, blocksize, maxReplicasPerRack, results);
+ chooseRandom(numOfReplicas, "~" + localMachine.getNetworkLocation(),
+ excludedNodes, blocksize, maxReplicasPerRack, results,
+ avoidStaleNodes);
} catch (NotEnoughReplicasException e) {
chooseRandom(numOfReplicas-(results.size()-oldNumOfReplicas),
localMachine.getNetworkLocation(), excludedNodes, blocksize,
- maxReplicasPerRack, results);
+ maxReplicasPerRack, results, avoidStaleNodes);
}
}
@@ -350,7 +369,8 @@ public class BlockPlacementPolicyDefault
HashMap<Node, Node> excludedNodes,
long blocksize,
int maxNodesPerRack,
- List<DatanodeDescriptor> results)
+ List<DatanodeDescriptor> results,
+ boolean avoidStaleNodes)
throws NotEnoughReplicasException {
int numOfAvailableNodes =
clusterMap.countNumOfAvailableNodes(nodes, excludedNodes.keySet());
@@ -368,7 +388,8 @@ public class BlockPlacementPolicyDefault
Node oldNode = excludedNodes.put(chosenNode, chosenNode);
if (oldNode == null) { // chosenNode was not in the excluded list
numOfAvailableNodes--;
- if (isGoodTarget(chosenNode, blocksize, maxNodesPerRack, results)) {
+ if (isGoodTarget(chosenNode, blocksize,
+ maxNodesPerRack, results, avoidStaleNodes)) {
results.add(chosenNode);
adjustExcludedNodes(excludedNodes, chosenNode);
return chosenNode;
@@ -395,7 +416,8 @@ public class BlockPlacementPolicyDefault
HashMap<Node, Node> excludedNodes,
long blocksize,
int maxNodesPerRack,
- List<DatanodeDescriptor> results)
+ List<DatanodeDescriptor> results,
+ boolean avoidStaleNodes)
throws NotEnoughReplicasException {
int numOfAvailableNodes =
@@ -414,7 +436,8 @@ public class BlockPlacementPolicyDefault
if (oldNode == null) {
numOfAvailableNodes--;
- if (isGoodTarget(chosenNode, blocksize, maxNodesPerRack, results)) {
+ if (isGoodTarget(chosenNode, blocksize,
+ maxNodesPerRack, results, avoidStaleNodes)) {
numOfReplicas--;
results.add(chosenNode);
adjustExcludedNodes(excludedNodes, chosenNode);
@@ -455,16 +478,34 @@ public class BlockPlacementPolicyDefault
* does not have too much load, and the rack does not have too many nodes
*/
private boolean isGoodTarget(DatanodeDescriptor node,
- long blockSize, int maxTargetPerLoc,
- List<DatanodeDescriptor> results) {
- return isGoodTarget(node, blockSize, maxTargetPerLoc,
- this.considerLoad, results);
+ long blockSize, int maxTargetPerRack,
+ List<DatanodeDescriptor> results,
+ boolean avoidStaleNodes) {
+ return isGoodTarget(node, blockSize, maxTargetPerRack, this.considerLoad,
+ results, avoidStaleNodes);
}
-
+
+ /**
+ * Determine if a node is a good target.
+ *
+ * @param node The target node
+ * @param blockSize Size of block
+ * @param maxTargetPerRack Maximum number of targets per rack. The value of
+ * this parameter depends on the number of racks in
+ * the cluster and total number of replicas for a block
+ * @param considerLoad whether or not to consider load of the target node
+ * @param results A list containing currently chosen nodes. Used to check if
+ * too many nodes has been chosen in the target rack.
+ * @param avoidStaleNodes Whether or not to avoid choosing stale nodes
+ * @return Return true if <i>node</i> has enough space,
+ * does not have too much load,
+ * and the rack does not have too many nodes.
+ */
protected boolean isGoodTarget(DatanodeDescriptor node,
- long blockSize, int maxTargetPerLoc,
+ long blockSize, int maxTargetPerRack,
boolean considerLoad,
- List<DatanodeDescriptor> results) {
+ List<DatanodeDescriptor> results,
+ boolean avoidStaleNodes) {
// check if the node is (being) decommissed
if (node.isDecommissionInProgress() || node.isDecommissioned()) {
if(LOG.isDebugEnabled()) {
@@ -475,6 +516,17 @@ public class BlockPlacementPolicyDefault
return false;
}
+ if (avoidStaleNodes) {
+ if (node.isStale(this.staleInterval)) {
+ if (LOG.isDebugEnabled()) {
+ threadLocalBuilder.get().append(node.toString()).append(": ")
+ .append("Node ").append(NodeBase.getPath(node))
+ .append(" is not chosen because the node is staled ");
+ }
+ return false;
+ }
+ }
+
long remaining = node.getRemaining() -
(node.getBlocksScheduled() * blockSize);
// check the remaining capacity of the target machine
@@ -514,7 +566,7 @@ public class BlockPlacementPolicyDefault
counter++;
}
}
- if (counter>maxTargetPerLoc) {
+ if (counter>maxTargetPerRack) {
if(LOG.isDebugEnabled()) {
threadLocalBuilder.get().append(node.toString()).append(": ")
.append("Node ").append(NodeBase.getPath(node))
Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java Tue Oct 16 00:02:55 2012
@@ -64,23 +64,20 @@ public class BlockPlacementPolicyWithNod
* @return the chosen node
*/
@Override
- protected DatanodeDescriptor chooseLocalNode(
- DatanodeDescriptor localMachine,
- HashMap<Node, Node> excludedNodes,
- long blocksize,
- int maxNodesPerRack,
- List<DatanodeDescriptor> results)
+ protected DatanodeDescriptor chooseLocalNode(DatanodeDescriptor localMachine,
+ HashMap<Node, Node> excludedNodes, long blocksize, int maxNodesPerRack,
+ List<DatanodeDescriptor> results, boolean avoidStaleNodes)
throws NotEnoughReplicasException {
// if no local machine, randomly choose one node
if (localMachine == null)
return chooseRandom(NodeBase.ROOT, excludedNodes,
- blocksize, maxNodesPerRack, results);
+ blocksize, maxNodesPerRack, results, avoidStaleNodes);
// otherwise try local machine first
Node oldNode = excludedNodes.put(localMachine, localMachine);
if (oldNode == null) { // was not in the excluded list
if (isGoodTarget(localMachine, blocksize,
- maxNodesPerRack, false, results)) {
+ maxNodesPerRack, false, results, avoidStaleNodes)) {
results.add(localMachine);
// Nodes under same nodegroup should be excluded.
addNodeGroupToExcludedNodes(excludedNodes,
@@ -92,18 +89,15 @@ public class BlockPlacementPolicyWithNod
// try a node on local node group
DatanodeDescriptor chosenNode = chooseLocalNodeGroup(
(NetworkTopologyWithNodeGroup)clusterMap, localMachine, excludedNodes,
- blocksize, maxNodesPerRack, results);
+ blocksize, maxNodesPerRack, results, avoidStaleNodes);
if (chosenNode != null) {
return chosenNode;
}
// try a node on local rack
return chooseLocalRack(localMachine, excludedNodes,
- blocksize, maxNodesPerRack, results);
+ blocksize, maxNodesPerRack, results, avoidStaleNodes);
}
- /**
- * {@inheritDoc}
- */
@Override
protected void adjustExcludedNodes(HashMap<Node, Node> excludedNodes,
Node chosenNode) {
@@ -121,21 +115,16 @@ public class BlockPlacementPolicyWithNod
}
}
- /**
- * {@inheritDoc}
- */
@Override
- protected DatanodeDescriptor chooseLocalRack(
- DatanodeDescriptor localMachine,
- HashMap<Node, Node> excludedNodes,
- long blocksize,
- int maxNodesPerRack,
- List<DatanodeDescriptor> results)
- throws NotEnoughReplicasException {
+ protected DatanodeDescriptor chooseLocalRack(DatanodeDescriptor localMachine,
+ HashMap<Node, Node> excludedNodes, long blocksize, int maxNodesPerRack,
+ List<DatanodeDescriptor> results, boolean avoidStaleNodes)
+ throws NotEnoughReplicasException {
// no local machine, so choose a random machine
if (localMachine == null) {
return chooseRandom(NodeBase.ROOT, excludedNodes,
- blocksize, maxNodesPerRack, results);
+ blocksize, maxNodesPerRack, results,
+ avoidStaleNodes);
}
// choose one from the local rack, but off-nodegroup
@@ -143,7 +132,8 @@ public class BlockPlacementPolicyWithNod
return chooseRandom(NetworkTopology.getFirstHalf(
localMachine.getNetworkLocation()),
excludedNodes, blocksize,
- maxNodesPerRack, results);
+ maxNodesPerRack, results,
+ avoidStaleNodes);
} catch (NotEnoughReplicasException e1) {
// find the second replica
DatanodeDescriptor newLocal=null;
@@ -157,42 +147,39 @@ public class BlockPlacementPolicyWithNod
}
if (newLocal != null) {
try {
- return chooseRandom(clusterMap.getRack(newLocal.getNetworkLocation()),
- excludedNodes, blocksize, maxNodesPerRack, results);
+ return chooseRandom(
+ clusterMap.getRack(newLocal.getNetworkLocation()), excludedNodes,
+ blocksize, maxNodesPerRack, results, avoidStaleNodes);
} catch(NotEnoughReplicasException e2) {
//otherwise randomly choose one from the network
- return chooseRandom(NodeBase.ROOT, excludedNodes,
- blocksize, maxNodesPerRack, results);
+ return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
+ maxNodesPerRack, results, avoidStaleNodes);
}
} else {
//otherwise randomly choose one from the network
- return chooseRandom(NodeBase.ROOT, excludedNodes,
- blocksize, maxNodesPerRack, results);
+ return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
+ maxNodesPerRack, results, avoidStaleNodes);
}
}
}
- /**
- * {@inheritDoc}
- */
@Override
protected void chooseRemoteRack(int numOfReplicas,
- DatanodeDescriptor localMachine,
- HashMap<Node, Node> excludedNodes,
- long blocksize,
- int maxReplicasPerRack,
- List<DatanodeDescriptor> results)
- throws NotEnoughReplicasException {
+ DatanodeDescriptor localMachine, HashMap<Node, Node> excludedNodes,
+ long blocksize, int maxReplicasPerRack, List<DatanodeDescriptor> results,
+ boolean avoidStaleNodes) throws NotEnoughReplicasException {
int oldNumOfReplicas = results.size();
// randomly choose one node from remote racks
try {
- chooseRandom(numOfReplicas, "~"+NetworkTopology.getFirstHalf(
- localMachine.getNetworkLocation()),
- excludedNodes, blocksize, maxReplicasPerRack, results);
+ chooseRandom(
+ numOfReplicas,
+ "~" + NetworkTopology.getFirstHalf(localMachine.getNetworkLocation()),
+ excludedNodes, blocksize, maxReplicasPerRack, results,
+ avoidStaleNodes);
} catch (NotEnoughReplicasException e) {
- chooseRandom(numOfReplicas-(results.size()-oldNumOfReplicas),
- localMachine.getNetworkLocation(), excludedNodes, blocksize,
- maxReplicasPerRack, results);
+ chooseRandom(numOfReplicas - (results.size() - oldNumOfReplicas),
+ localMachine.getNetworkLocation(), excludedNodes, blocksize,
+ maxReplicasPerRack, results, avoidStaleNodes);
}
}
@@ -202,19 +189,22 @@ public class BlockPlacementPolicyWithNod
* if still no such node is available, choose a random node in the cluster.
* @return the chosen node
*/
- private DatanodeDescriptor chooseLocalNodeGroup(NetworkTopologyWithNodeGroup clusterMap,
- DatanodeDescriptor localMachine, HashMap<Node, Node> excludedNodes, long blocksize,
- int maxNodesPerRack, List<DatanodeDescriptor> results) throws NotEnoughReplicasException {
+ private DatanodeDescriptor chooseLocalNodeGroup(
+ NetworkTopologyWithNodeGroup clusterMap, DatanodeDescriptor localMachine,
+ HashMap<Node, Node> excludedNodes, long blocksize, int maxNodesPerRack,
+ List<DatanodeDescriptor> results, boolean avoidStaleNodes)
+ throws NotEnoughReplicasException {
// no local machine, so choose a random machine
if (localMachine == null) {
return chooseRandom(NodeBase.ROOT, excludedNodes,
- blocksize, maxNodesPerRack, results);
+ blocksize, maxNodesPerRack, results, avoidStaleNodes);
}
// choose one from the local node group
try {
- return chooseRandom(clusterMap.getNodeGroup(localMachine.getNetworkLocation()),
- excludedNodes, blocksize, maxNodesPerRack, results);
+ return chooseRandom(
+ clusterMap.getNodeGroup(localMachine.getNetworkLocation()),
+ excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes);
} catch (NotEnoughReplicasException e1) {
// find the second replica
DatanodeDescriptor newLocal=null;
@@ -228,17 +218,19 @@ public class BlockPlacementPolicyWithNod
}
if (newLocal != null) {
try {
- return chooseRandom(clusterMap.getNodeGroup(newLocal.getNetworkLocation()),
- excludedNodes, blocksize, maxNodesPerRack, results);
+ return chooseRandom(
+ clusterMap.getNodeGroup(newLocal.getNetworkLocation()),
+ excludedNodes, blocksize, maxNodesPerRack, results,
+ avoidStaleNodes);
} catch(NotEnoughReplicasException e2) {
//otherwise randomly choose one from the network
- return chooseRandom(NodeBase.ROOT, excludedNodes,
- blocksize, maxNodesPerRack, results);
+ return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
+ maxNodesPerRack, results, avoidStaleNodes);
}
} else {
//otherwise randomly choose one from the network
- return chooseRandom(NodeBase.ROOT, excludedNodes,
- blocksize, maxNodesPerRack, results);
+ return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
+ maxNodesPerRack, results, avoidStaleNodes);
}
}
}
Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java Tue Oct 16 00:02:55 2012
@@ -94,7 +94,7 @@ class BlocksMap {
}
void close() {
- blocks = null;
+ // Empty blocks once GSet#clear is implemented (HDFS-3940)
}
BlockCollection getBlockCollection(Block b) {
Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java Tue Oct 16 00:02:55 2012
@@ -25,6 +25,7 @@ import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -50,7 +51,6 @@ import org.apache.hadoop.hdfs.protocol.H
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair;
-import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand;
@@ -76,6 +76,7 @@ import org.apache.hadoop.util.Reflection
import org.apache.hadoop.util.Time;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import com.google.common.net.InetAddresses;
/**
@@ -88,8 +89,8 @@ public class DatanodeManager {
private final Namesystem namesystem;
private final BlockManager blockManager;
-
private final HeartbeatManager heartbeatManager;
+ private Daemon decommissionthread = null;
/**
* Stores the datanode -> block map.
@@ -127,23 +128,33 @@ public class DatanodeManager {
/** Ask Datanode only up to this many blocks to delete. */
final int blockInvalidateLimit;
+ /** Whether or not to check stale DataNodes for read/write */
+ private final boolean checkForStaleDataNodes;
+
+ /** The interval for judging stale DataNodes for read/write */
+ private final long staleInterval;
+
+ /** Whether or not to avoid using stale DataNodes for writing */
+ private volatile boolean avoidStaleDataNodesForWrite;
+
+ /** The number of stale DataNodes */
+ private volatile int numStaleNodes;
+
/**
* Whether or not this cluster has ever consisted of more than 1 rack,
* according to the NetworkTopology.
*/
private boolean hasClusterEverBeenMultiRack = false;
- DatanodeManager(final BlockManager blockManager,
- final Namesystem namesystem, final Configuration conf
- ) throws IOException {
+ DatanodeManager(final BlockManager blockManager, final Namesystem namesystem,
+ final Configuration conf) throws IOException {
this.namesystem = namesystem;
this.blockManager = blockManager;
Class<? extends NetworkTopology> networkTopologyClass =
conf.getClass(CommonConfigurationKeysPublic.NET_TOPOLOGY_IMPL_KEY,
NetworkTopology.class, NetworkTopology.class);
- networktopology = (NetworkTopology) ReflectionUtils.newInstance(
- networkTopologyClass, conf);
+ networktopology = ReflectionUtils.newInstance(networkTopologyClass, conf);
this.heartbeatManager = new HeartbeatManager(namesystem, blockManager, conf);
@@ -176,10 +187,69 @@ public class DatanodeManager {
DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_KEY, blockInvalidateLimit);
LOG.info(DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_KEY
+ "=" + this.blockInvalidateLimit);
+
+ checkForStaleDataNodes = conf.getBoolean(
+ DFSConfigKeys.DFS_NAMENODE_CHECK_STALE_DATANODE_KEY,
+ DFSConfigKeys.DFS_NAMENODE_CHECK_STALE_DATANODE_DEFAULT);
+
+ staleInterval = getStaleIntervalFromConf(conf, heartbeatExpireInterval);
+ avoidStaleDataNodesForWrite = getAvoidStaleForWriteFromConf(conf,
+ checkForStaleDataNodes);
}
-
- private Daemon decommissionthread = null;
-
+
+ private static long getStaleIntervalFromConf(Configuration conf,
+ long heartbeatExpireInterval) {
+ long staleInterval = conf.getLong(
+ DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY,
+ DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT);
+ Preconditions.checkArgument(staleInterval > 0,
+ DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY +
+ " = '" + staleInterval + "' is invalid. " +
+ "It should be a positive non-zero value.");
+
+ final long heartbeatIntervalSeconds = conf.getLong(
+ DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
+ DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT);
+ // The stale interval value cannot be smaller than
+ // 3 times of heartbeat interval
+ final long minStaleInterval = conf.getInt(
+ DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_MINIMUM_INTERVAL_KEY,
+ DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_MINIMUM_INTERVAL_DEFAULT)
+ * heartbeatIntervalSeconds * 1000;
+ if (staleInterval < minStaleInterval) {
+ LOG.warn("The given interval for marking stale datanode = "
+ + staleInterval + ", which is less than "
+ + DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_MINIMUM_INTERVAL_DEFAULT
+ + " heartbeat intervals. This may cause too frequent changes of "
+ + "stale states of DataNodes since a heartbeat msg may be missing "
+ + "due to temporary short-term failures. Reset stale interval to "
+ + minStaleInterval + ".");
+ staleInterval = minStaleInterval;
+ }
+ if (staleInterval > heartbeatExpireInterval) {
+ LOG.warn("The given interval for marking stale datanode = "
+ + staleInterval + ", which is larger than heartbeat expire interval "
+ + heartbeatExpireInterval + ".");
+ }
+ return staleInterval;
+ }
+
+ static boolean getAvoidStaleForWriteFromConf(Configuration conf,
+ boolean checkForStale) {
+ boolean avoid = conf.getBoolean(
+ DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY,
+ DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_DEFAULT);
+ boolean avoidStaleDataNodesForWrite = checkForStale && avoid;
+ if (!checkForStale && avoid) {
+ LOG.warn("Cannot set "
+ + DFSConfigKeys.DFS_NAMENODE_CHECK_STALE_DATANODE_KEY
+ + " as false while setting "
+ + DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY
+ + " as true.");
+ }
+ return avoidStaleDataNodesForWrite;
+ }
+
void activate(final Configuration conf) {
final DecommissionManager dm = new DecommissionManager(namesystem, blockManager);
this.decommissionthread = new Daemon(dm.new Monitor(
@@ -193,7 +263,13 @@ public class DatanodeManager {
}
void close() {
- if (decommissionthread != null) decommissionthread.interrupt();
+ if (decommissionthread != null) {
+ decommissionthread.interrupt();
+ try {
+ decommissionthread.join(3000);
+ } catch (InterruptedException e) {
+ }
+ }
heartbeatManager.close();
}
@@ -226,14 +302,18 @@ public class DatanodeManager {
if (rName != null)
client = new NodeBase(rName + NodeBase.PATH_SEPARATOR_STR + targethost);
}
+
+ Comparator<DatanodeInfo> comparator = checkForStaleDataNodes ?
+ new DFSUtil.DecomStaleComparator(staleInterval) :
+ DFSUtil.DECOM_COMPARATOR;
+
for (LocatedBlock b : locatedblocks) {
networktopology.pseudoSortByDistance(client, b.getLocations());
-
- // Move decommissioned datanodes to the bottom
- Arrays.sort(b.getLocations(), DFSUtil.DECOM_COMPARATOR);
+ // Move decommissioned/stale datanodes to the bottom
+ Arrays.sort(b.getLocations(), comparator);
}
}
-
+
CyclicIteration<String, DatanodeDescriptor> getDatanodeCyclicIteration(
final String firstkey) {
return new CyclicIteration<String, DatanodeDescriptor>(
@@ -583,7 +663,8 @@ public class DatanodeManager {
+ " storage " + nodeReg.getStorageID());
DatanodeDescriptor nodeS = datanodeMap.get(nodeReg.getStorageID());
- DatanodeDescriptor nodeN = getDatanodeByHost(nodeReg.getXferAddr());
+ DatanodeDescriptor nodeN = host2DatanodeMap.getDatanodeByXferAddr(
+ nodeReg.getIpAddr(), nodeReg.getXferPort());
if (nodeN != null && nodeN != nodeS) {
NameNode.LOG.info("BLOCK* NameSystem.registerDatanode: "
@@ -693,7 +774,7 @@ public class DatanodeManager {
* 3. Added to exclude --> start decommission.
* 4. Removed from exclude --> stop decommission.
*/
- private void refreshDatanodes() throws IOException {
+ private void refreshDatanodes() {
for(DatanodeDescriptor node : datanodeMap.values()) {
// Check if not include.
if (!inHostsList(node)) {
@@ -752,7 +833,61 @@ public class DatanodeManager {
namesystem.readUnlock();
}
}
+
+ /* Getter and Setter for stale DataNodes related attributes */
+
+ /**
+ * @return whether or not to avoid writing to stale datanodes
+ */
+ public boolean isAvoidingStaleDataNodesForWrite() {
+ return avoidStaleDataNodesForWrite;
+ }
+
+ /**
+ * Set the value of {@link DatanodeManager#avoidStaleDataNodesForWrite}.
+ * The HeartbeatManager disable avoidStaleDataNodesForWrite when more than
+ * half of the DataNodes are marked as stale.
+ *
+ * @param avoidStaleDataNodesForWrite
+ * The value to set to
+ * {@link DatanodeManager#avoidStaleDataNodesForWrite}
+ */
+ void setAvoidStaleDataNodesForWrite(boolean avoidStaleDataNodesForWrite) {
+ this.avoidStaleDataNodesForWrite = avoidStaleDataNodesForWrite;
+ }
+
+ /**
+ * @return Whether or not to check stale DataNodes for R/W
+ */
+ boolean isCheckingForStaleDataNodes() {
+ return checkForStaleDataNodes;
+ }
+
+ /**
+ * @return The time interval used to mark DataNodes as stale.
+ */
+ long getStaleInterval() {
+ return staleInterval;
+ }
+ /**
+ * Set the number of current stale DataNodes. The HeartbeatManager got this
+ * number based on DataNodes' heartbeats.
+ *
+ * @param numStaleNodes
+ * The number of stale DataNodes to be set.
+ */
+ void setNumStaleNodes(int numStaleNodes) {
+ this.numStaleNodes = numStaleNodes;
+ }
+
+ /**
+ * @return Return the current number of stale DataNodes (detected by
+ * HeartbeatManager).
+ */
+ int getNumStaleNodes() {
+ return this.numStaleNodes;
+ }
/** Fetch live and dead datanodes. */
public void fetchDatanodes(final List<DatanodeDescriptor> live,
@@ -931,7 +1066,7 @@ public class DatanodeManager {
return nodes;
}
- private void setDatanodeDead(DatanodeDescriptor node) throws IOException {
+ private void setDatanodeDead(DatanodeDescriptor node) {
node.setLastUpdate(0);
}
Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java Tue Oct 16 00:02:55 2012
@@ -26,11 +26,12 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
-import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.Time;
+import com.google.common.base.Preconditions;
+
/**
* Manage the heartbeats received from datanodes.
* The datanode list and statistics are synchronized
@@ -55,18 +56,48 @@ class HeartbeatManager implements Datano
private final long heartbeatRecheckInterval;
/** Heartbeat monitor thread */
private final Daemon heartbeatThread = new Daemon(new Monitor());
-
+ /**
+ * The initial setting of end user which indicates whether or not to avoid
+ * writing to stale datanodes.
+ */
+ private final boolean initialAvoidWriteStaleNodes;
+ /**
+ * When the ratio of stale datanodes reaches this number, stop avoiding
+ * writing to stale datanodes, i.e., continue using stale nodes for writing.
+ */
+ private final float ratioUseStaleDataNodesForWrite;
+
final Namesystem namesystem;
final BlockManager blockManager;
- HeartbeatManager(final Namesystem namesystem, final BlockManager blockManager,
- final Configuration conf) {
- this.heartbeatRecheckInterval = conf.getInt(
- DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
- DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5 minutes
-
+ HeartbeatManager(final Namesystem namesystem,
+ final BlockManager blockManager, final Configuration conf) {
this.namesystem = namesystem;
this.blockManager = blockManager;
+ boolean checkStaleNodes = conf.getBoolean(
+ DFSConfigKeys.DFS_NAMENODE_CHECK_STALE_DATANODE_KEY,
+ DFSConfigKeys.DFS_NAMENODE_CHECK_STALE_DATANODE_DEFAULT);
+ long recheckInterval = conf.getInt(
+ DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
+ DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5 min
+ long staleInterval = conf.getLong(
+ DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY,
+ DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT);// 30s
+ this.initialAvoidWriteStaleNodes = DatanodeManager
+ .getAvoidStaleForWriteFromConf(conf, checkStaleNodes);
+ this.ratioUseStaleDataNodesForWrite = conf.getFloat(
+ DFSConfigKeys.DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_KEY,
+ DFSConfigKeys.DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_DEFAULT);
+ Preconditions.checkArgument(
+ (ratioUseStaleDataNodesForWrite > 0 &&
+ ratioUseStaleDataNodesForWrite <= 1.0f),
+ DFSConfigKeys.DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_KEY +
+ " = '" + ratioUseStaleDataNodesForWrite + "' is invalid. " +
+ "It should be a positive non-zero float value, not greater than 1.0f.");
+
+ this.heartbeatRecheckInterval = (checkStaleNodes
+ && initialAvoidWriteStaleNodes
+ && staleInterval < recheckInterval) ? staleInterval : recheckInterval;
}
void activate(Configuration conf) {
@@ -75,6 +106,11 @@ class HeartbeatManager implements Datano
void close() {
heartbeatThread.interrupt();
+ try {
+ // This will no effect if the thread hasn't yet been started.
+ heartbeatThread.join(3000);
+ } catch (InterruptedException e) {
+ }
}
synchronized int getLiveDatanodeCount() {
@@ -206,16 +242,39 @@ class HeartbeatManager implements Datano
if (namesystem.isInSafeMode()) {
return;
}
+ boolean checkStaleNodes = dm.isCheckingForStaleDataNodes();
boolean allAlive = false;
while (!allAlive) {
// locate the first dead node.
DatanodeID dead = null;
+ // check the number of stale nodes
+ int numOfStaleNodes = 0;
synchronized(this) {
for (DatanodeDescriptor d : datanodes) {
- if (dm.isDatanodeDead(d)) {
+ if (dead == null && dm.isDatanodeDead(d)) {
stats.incrExpiredHeartbeats();
dead = d;
- break;
+ if (!checkStaleNodes) {
+ break;
+ }
+ }
+ if (checkStaleNodes &&
+ d.isStale(dm.getStaleInterval())) {
+ numOfStaleNodes++;
+ }
+ }
+
+ // Change whether to avoid using stale datanodes for writing
+ // based on proportion of stale datanodes
+ if (checkStaleNodes) {
+ dm.setNumStaleNodes(numOfStaleNodes);
+ if (numOfStaleNodes >
+ datanodes.size() * ratioUseStaleDataNodesForWrite) {
+ dm.setAvoidStaleDataNodesForWrite(false);
+ } else {
+ if (this.initialAvoidWriteStaleNodes) {
+ dm.setAvoidStaleDataNodesForWrite(true);
+ }
}
}
}
@@ -224,10 +283,10 @@ class HeartbeatManager implements Datano
if (!allAlive) {
// acquire the fsnamesystem lock, and then remove the dead node.
namesystem.writeLock();
- if (namesystem.isInSafeMode()) {
- return;
- }
try {
+ if (namesystem.isInSafeMode()) {
+ return;
+ }
synchronized(this) {
dm.removeDeadDatanode(dead);
}
Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/Host2NodesMap.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/Host2NodesMap.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/Host2NodesMap.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/Host2NodesMap.java Tue Oct 16 00:02:55 2012
@@ -159,6 +159,35 @@ class Host2NodesMap {
}
}
+ /**
+ * Find data node by its transfer address
+ *
+ * @return DatanodeDescriptor if found or null otherwise
+ */
+ public DatanodeDescriptor getDatanodeByXferAddr(String ipAddr,
+ int xferPort) {
+ if (ipAddr==null) {
+ return null;
+ }
+
+ hostmapLock.readLock().lock();
+ try {
+ DatanodeDescriptor[] nodes = map.get(ipAddr);
+ // no entry
+ if (nodes== null) {
+ return null;
+ }
+ for(DatanodeDescriptor containedNode:nodes) {
+ if (xferPort == containedNode.getXferPort()) {
+ return containedNode;
+ }
+ }
+ return null;
+ } finally {
+ hostmapLock.readLock().unlock();
+ }
+ }
+
@Override
public String toString() {
final StringBuilder b = new StringBuilder(getClass().getSimpleName())
Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java Tue Oct 16 00:02:55 2012
@@ -22,11 +22,7 @@ import java.util.Map;
import java.util.Queue;
import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.server.blockmanagement.PendingDataNodeMessages.ReportedBlockInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
-import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
-
-import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java Tue Oct 16 00:02:55 2012
@@ -39,7 +39,8 @@ public final class HdfsServerConstants {
*/
static public enum NodeType {
NAME_NODE,
- DATA_NODE;
+ DATA_NODE,
+ JOURNAL_NODE;
}
/** Startup options */
Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java Tue Oct 16 00:02:55 2012
@@ -44,7 +44,6 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.BlockReader;
import org.apache.hadoop.hdfs.BlockReaderFactory;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -60,14 +59,12 @@ import org.apache.hadoop.hdfs.web.resour
import org.apache.hadoop.hdfs.web.resources.DoAsParam;
import org.apache.hadoop.hdfs.web.resources.UserParam;
import org.apache.hadoop.http.HtmlQuoting;
-import org.apache.hadoop.io.Text;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.apache.hadoop.security.authentication.util.KerberosName;
-import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.VersionInfo;
@@ -279,6 +276,9 @@ public class JspHelper {
FIELD_PERCENT_REMAINING = 9,
FIELD_ADMIN_STATE = 10,
FIELD_DECOMMISSIONED = 11,
+ FIELD_BLOCKPOOL_USED = 12,
+ FIELD_PERBLOCKPOOL_USED = 13,
+ FIELD_FAILED_VOLUMES = 14,
SORT_ORDER_ASC = 1,
SORT_ORDER_DSC = 2;
@@ -306,6 +306,12 @@ public class JspHelper {
sortField = FIELD_ADMIN_STATE;
} else if (field.equals("decommissioned")) {
sortField = FIELD_DECOMMISSIONED;
+ } else if (field.equals("bpused")) {
+ sortField = FIELD_BLOCKPOOL_USED;
+ } else if (field.equals("pcbpused")) {
+ sortField = FIELD_PERBLOCKPOOL_USED;
+ } else if (field.equals("volfails")) {
+ sortField = FIELD_FAILED_VOLUMES;
} else {
sortField = FIELD_NAME;
}
@@ -364,6 +370,18 @@ public class JspHelper {
case FIELD_NAME:
ret = d1.getHostName().compareTo(d2.getHostName());
break;
+ case FIELD_BLOCKPOOL_USED:
+ dlong = d1.getBlockPoolUsed() - d2.getBlockPoolUsed();
+ ret = (dlong < 0) ? -1 : ((dlong > 0) ? 1 : 0);
+ break;
+ case FIELD_PERBLOCKPOOL_USED:
+ ddbl = d1.getBlockPoolUsedPercent() - d2.getBlockPoolUsedPercent();
+ ret = (ddbl < 0) ? -1 : ((ddbl > 0) ? 1 : 0);
+ break;
+ case FIELD_FAILED_VOLUMES:
+ int dint = d1.getVolumeFailures() - d2.getVolumeFailures();
+ ret = (dint < 0) ? -1 : ((dint > 0) ? 1 : 0);
+ break;
}
return (sortOrder == SORT_ORDER_DSC) ? -ret : ret;
}
Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java Tue Oct 16 00:02:55 2012
@@ -42,6 +42,8 @@ import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.util.VersionInfo;
+import com.google.common.base.Preconditions;
+
/**
@@ -76,7 +78,7 @@ public abstract class Storage extends St
/** Layout versions of 0.20.203 release */
public static final int[] LAYOUT_VERSIONS_203 = {-19, -31};
- private static final String STORAGE_FILE_LOCK = "in_use.lock";
+ public static final String STORAGE_FILE_LOCK = "in_use.lock";
protected static final String STORAGE_FILE_VERSION = "VERSION";
public static final String STORAGE_DIR_CURRENT = "current";
public static final String STORAGE_DIR_PREVIOUS = "previous";
@@ -86,6 +88,12 @@ public abstract class Storage extends St
public static final String STORAGE_TMP_LAST_CKPT = "lastcheckpoint.tmp";
public static final String STORAGE_PREVIOUS_CKPT = "previous.checkpoint";
+ /**
+ * The blocksBeingWritten directory which was used in some 1.x and earlier
+ * releases.
+ */
+ public static final String STORAGE_1_BBW = "blocksBeingWritten";
+
public enum StorageState {
NON_EXISTENT,
NOT_FORMATTED,
@@ -746,6 +754,15 @@ public abstract class Storage extends St
return storageDirs.get(idx);
}
+ /**
+ * @return the storage directory, with the precondition that this storage
+ * has exactly one storage directory
+ */
+ public StorageDirectory getSingularStorageDir() {
+ Preconditions.checkState(storageDirs.size() == 1);
+ return storageDirs.get(0);
+ }
+
protected void addStorageDir(StorageDirectory sd) {
storageDirs.add(sd);
}
@@ -853,6 +870,7 @@ public abstract class Storage extends St
* @return a string representation of the formattable item, suitable
* for display to the user inside a prompt
*/
+ @Override
public String toString();
}
Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java Tue Oct 16 00:02:55 2012
@@ -51,6 +51,8 @@ import org.apache.hadoop.hdfs.util.DataT
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Time;
+import com.google.common.annotations.VisibleForTesting;
+
/**
* Scans the block files under a block pool and verifies that the
* files are not corrupt.
@@ -255,6 +257,11 @@ class BlockPoolSliceScanner {
}
}
+ @VisibleForTesting
+ long getTotalScans() {
+ return totalScans;
+ }
+
/** @return the last scan time for the block pool. */
long getLastScanTime() {
return lastScanTime.get();
@@ -367,7 +374,8 @@ class BlockPoolSliceScanner {
throttler.setBandwidth(Math.min(bw, MAX_SCAN_RATE));
}
- private void verifyBlock(ExtendedBlock block) {
+ @VisibleForTesting
+ void verifyBlock(ExtendedBlock block) {
BlockSender blockSender = null;
/* In case of failure, attempt to read second time to reduce
@@ -563,7 +571,24 @@ class BlockPoolSliceScanner {
currentPeriodStart = Time.now();
}
+ private synchronized boolean workRemainingInCurrentPeriod() {
+ if (bytesLeft <= 0 && Time.now() < currentPeriodStart + scanPeriod) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Skipping scan since bytesLeft=" + bytesLeft + ", Start=" +
+ currentPeriodStart + ", period=" + scanPeriod + ", now=" +
+ Time.now() + " " + blockPoolId);
+ }
+ return false;
+ } else {
+ return true;
+ }
+ }
+
void scanBlockPoolSlice() {
+ if (!workRemainingInCurrentPeriod()) {
+ return;
+ }
+
// Create a new processedBlocks structure
processedBlocks = new HashMap<Long, Integer>();
if (!assignInitialVerificationTimes()) {
@@ -608,14 +633,14 @@ class BlockPoolSliceScanner {
LOG.warn("RuntimeException during BlockPoolScanner.scan()", e);
throw e;
} finally {
- cleanUp();
+ rollVerificationLogs();
if (LOG.isDebugEnabled()) {
LOG.debug("Done scanning block pool: " + blockPoolId);
}
}
}
- private synchronized void cleanUp() {
+ private synchronized void rollVerificationLogs() {
if (verificationLog != null) {
try {
verificationLog.logs.roll();
Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java Tue Oct 16 00:02:55 2012
@@ -34,6 +34,8 @@ import org.apache.hadoop.hdfs.protocol.E
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import com.google.common.annotations.VisibleForTesting;
+
/**
* DataBlockScanner manages block scanning for all the block pools. For each
* block pool a {@link BlockPoolSliceScanner} is created which runs in a separate
@@ -47,6 +49,8 @@ public class DataBlockScanner implements
private final FsDatasetSpi<? extends FsVolumeSpi> dataset;
private final Configuration conf;
+ static final int SLEEP_PERIOD_MS = 5 * 1000;
+
/**
* Map to find the BlockPoolScanner for a given block pool id. This is updated
* when a BPOfferService becomes alive or dies.
@@ -68,10 +72,10 @@ public class DataBlockScanner implements
String currentBpId = "";
boolean firstRun = true;
while (datanode.shouldRun && !Thread.interrupted()) {
- //Sleep everytime except in the first interation.
+ //Sleep everytime except in the first iteration.
if (!firstRun) {
try {
- Thread.sleep(5000);
+ Thread.sleep(SLEEP_PERIOD_MS);
} catch (InterruptedException ex) {
// Interrupt itself again to set the interrupt status
blockScannerThread.interrupt();
@@ -103,7 +107,7 @@ public class DataBlockScanner implements
while ((getBlockPoolSetSize() < datanode.getAllBpOs().length)
|| (getBlockPoolSetSize() < 1)) {
try {
- Thread.sleep(5000);
+ Thread.sleep(SLEEP_PERIOD_MS);
} catch (InterruptedException e) {
blockScannerThread.interrupt();
return;
@@ -168,7 +172,8 @@ public class DataBlockScanner implements
return blockPoolScannerMap.size();
}
- private synchronized BlockPoolSliceScanner getBPScanner(String bpid) {
+ @VisibleForTesting
+ synchronized BlockPoolSliceScanner getBPScanner(String bpid) {
return blockPoolScannerMap.get(bpid);
}
@@ -249,7 +254,7 @@ public class DataBlockScanner implements
LOG.info("Removed bpid="+blockPoolId+" from blockPoolScannerMap");
}
- // This method is used for testing
+ @VisibleForTesting
long getBlocksScannedInLastRun(String bpid) throws IOException {
BlockPoolSliceScanner bpScanner = getBPScanner(bpid);
if (bpScanner == null) {
@@ -259,6 +264,16 @@ public class DataBlockScanner implements
}
}
+ @VisibleForTesting
+ long getTotalScans(String bpid) throws IOException {
+ BlockPoolSliceScanner bpScanner = getBPScanner(bpid);
+ if (bpScanner == null) {
+ throw new IOException("Block Pool: "+bpid+" is not running");
+ } else {
+ return bpScanner.getTotalScans();
+ }
+ }
+
public void start() {
blockScannerThread = new Thread(this);
blockScannerThread.setDaemon(true);
Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Tue Oct 16 00:02:55 2012
@@ -251,7 +251,7 @@ public class DataNode extends Configured
Daemon dataXceiverServer = null;
ThreadGroup threadGroup = null;
private DNConf dnConf;
- private boolean heartbeatsDisabledForTests = false;
+ private volatile boolean heartbeatsDisabledForTests = false;
private DataStorage storage = null;
private HttpServer infoServer = null;
DataNodeMetrics metrics;
@@ -277,7 +277,7 @@ public class DataNode extends Configured
private AbstractList<File> dataDirs;
private Configuration conf;
- private final String userWithLocalPathAccess;
+ private final List<String> usersWithLocalPathAccess;
private boolean connectToDnViaHostname;
ReadaheadPool readaheadPool;
private final boolean getHdfsBlockLocationsEnabled;
@@ -300,8 +300,8 @@ public class DataNode extends Configured
final SecureResources resources) throws IOException {
super(conf);
- this.userWithLocalPathAccess =
- conf.get(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY);
+ this.usersWithLocalPathAccess = Arrays.asList(
+ conf.getTrimmedStrings(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY));
this.connectToDnViaHostname = conf.getBoolean(
DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME,
DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT);
@@ -417,10 +417,15 @@ public class DataNode extends Configured
new ClientDatanodeProtocolServerSideTranslatorPB(this);
BlockingService service = ClientDatanodeProtocolService
.newReflectiveBlockingService(clientDatanodeProtocolXlator);
- ipcServer = RPC.getServer(ClientDatanodeProtocolPB.class, service, ipcAddr
- .getHostName(), ipcAddr.getPort(), conf.getInt(
- DFS_DATANODE_HANDLER_COUNT_KEY, DFS_DATANODE_HANDLER_COUNT_DEFAULT),
- false, conf, blockPoolTokenSecretManager);
+ ipcServer = new RPC.Builder(conf)
+ .setProtocol(ClientDatanodeProtocolPB.class)
+ .setInstance(service)
+ .setBindAddress(ipcAddr.getHostName())
+ .setPort(ipcAddr.getPort())
+ .setNumHandlers(
+ conf.getInt(DFS_DATANODE_HANDLER_COUNT_KEY,
+ DFS_DATANODE_HANDLER_COUNT_DEFAULT)).setVerbose(false)
+ .setSecretManager(blockPoolTokenSecretManager).build();
InterDatanodeProtocolServerSideTranslatorPB interDatanodeProtocolXlator =
new InterDatanodeProtocolServerSideTranslatorPB(this);
@@ -1007,7 +1012,7 @@ public class DataNode extends Configured
private void checkBlockLocalPathAccess() throws IOException {
checkKerberosAuthMethod("getBlockLocalPathInfo()");
String currentUser = UserGroupInformation.getCurrentUser().getShortUserName();
- if (!currentUser.equals(this.userWithLocalPathAccess)) {
+ if (!usersWithLocalPathAccess.contains(currentUser)) {
throw new AccessControlException(
"Can't continue with getBlockLocalPathInfo() "
+ "authorization. The user " + currentUser
Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java Tue Oct 16 00:02:55 2012
@@ -451,6 +451,8 @@ public class DataStorage extends Storage
File curDir = sd.getCurrentDir();
File prevDir = sd.getPreviousDir();
+ File bbwDir = new File(sd.getRoot(), Storage.STORAGE_1_BBW);
+
assert curDir.exists() : "Data node current directory must exist.";
// Cleanup directory "detach"
cleanupDetachDir(new File(curDir, STORAGE_DIR_DETACHED));
@@ -471,7 +473,7 @@ public class DataStorage extends Storage
BlockPoolSliceStorage bpStorage = new BlockPoolSliceStorage(nsInfo.getNamespaceID(),
nsInfo.getBlockPoolID(), nsInfo.getCTime(), nsInfo.getClusterID());
bpStorage.format(curDir, nsInfo);
- linkAllBlocks(tmpDir, new File(curBpDir, STORAGE_DIR_CURRENT));
+ linkAllBlocks(tmpDir, bbwDir, new File(curBpDir, STORAGE_DIR_CURRENT));
// 4. Write version file under <SD>/current
layoutVersion = HdfsConstants.LAYOUT_VERSION;
@@ -578,15 +580,21 @@ public class DataStorage extends Storage
+ "; cur CTime = " + this.getCTime());
assert sd.getCurrentDir().exists() : "Current directory must exist.";
final File tmpDir = sd.getFinalizedTmp();//finalized.tmp directory
+ final File bbwDir = new File(sd.getRoot(), Storage.STORAGE_1_BBW);
// 1. rename previous to finalized.tmp
rename(prevDir, tmpDir);
// 2. delete finalized.tmp dir in a separate thread
+ // Also delete the blocksBeingWritten from HDFS 1.x and earlier, if
+ // it exists.
new Daemon(new Runnable() {
@Override
public void run() {
try {
deleteDir(tmpDir);
+ if (bbwDir.exists()) {
+ deleteDir(bbwDir);
+ }
} catch(IOException ex) {
LOG.error("Finalize upgrade for " + dataDirPath + " failed.", ex);
}
@@ -620,11 +628,16 @@ public class DataStorage extends Storage
/**
* Hardlink all finalized and RBW blocks in fromDir to toDir
- * @param fromDir directory where the snapshot is stored
- * @param toDir the current data directory
- * @throws IOException if error occurs during hardlink
+ *
+ * @param fromDir The directory where the 'from' snapshot is stored
+ * @param fromBbwDir In HDFS 1.x, the directory where blocks
+ * that are under construction are stored.
+ * @param toDir The current data directory
+ *
+ * @throws IOException If error occurs during hardlink
*/
- private void linkAllBlocks(File fromDir, File toDir) throws IOException {
+ private void linkAllBlocks(File fromDir, File fromBbwDir, File toDir)
+ throws IOException {
HardLink hardLink = new HardLink();
// do the link
int diskLayoutVersion = this.getLayoutVersion();
@@ -632,13 +645,23 @@ public class DataStorage extends Storage
// hardlink finalized blocks in tmpDir/finalized
linkBlocks(new File(fromDir, STORAGE_DIR_FINALIZED),
new File(toDir, STORAGE_DIR_FINALIZED), diskLayoutVersion, hardLink);
- // hardlink rbw blocks in tmpDir/finalized
+ // hardlink rbw blocks in tmpDir/rbw
linkBlocks(new File(fromDir, STORAGE_DIR_RBW),
new File(toDir, STORAGE_DIR_RBW), diskLayoutVersion, hardLink);
} else { // pre-RBW version
// hardlink finalized blocks in tmpDir
linkBlocks(fromDir, new File(toDir, STORAGE_DIR_FINALIZED),
diskLayoutVersion, hardLink);
+ if (fromBbwDir.exists()) {
+ /*
+ * We need to put the 'blocksBeingWritten' from HDFS 1.x into the rbw
+ * directory. It's a little messy, because the blocksBeingWriten was
+ * NOT underneath the 'current' directory in those releases. See
+ * HDFS-3731 for details.
+ */
+ linkBlocks(fromBbwDir,
+ new File(toDir, STORAGE_DIR_RBW), diskLayoutVersion, hardLink);
+ }
}
LOG.info( hardLink.linkStats.report() );
}
Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Tue Oct 16 00:02:55 2012
@@ -609,6 +609,7 @@ class DataXceiver extends Receiver imple
.setBytesPerCrc(bytesPerCRC)
.setCrcPerBlock(crcPerBlock)
.setMd5(ByteString.copyFrom(md5.getDigest()))
+ .setCrcType(HdfsProtoUtil.toProto(checksum.getChecksumType()))
)
.build()
.writeDelimitedTo(out);
Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/SecureDataNodeStarter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/SecureDataNodeStarter.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/SecureDataNodeStarter.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/SecureDataNodeStarter.java Tue Oct 16 00:02:55 2012
@@ -16,11 +16,11 @@
*/
package org.apache.hadoop.hdfs.server.datanode;
-import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION;
-
+import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.channels.ServerSocketChannel;
+import java.security.GeneralSecurityException;
import org.apache.commons.daemon.Daemon;
import org.apache.commons.daemon.DaemonContext;
@@ -28,9 +28,15 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.http.HttpServer;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.ssl.SSLFactory;
+import org.mortbay.jetty.Connector;
import org.mortbay.jetty.nio.SelectChannelConnector;
+import org.mortbay.jetty.security.SslSocketConnector;
+
+import javax.net.ssl.SSLServerSocketFactory;
/**
* Utility class to start a datanode in a secure cluster, first obtaining
@@ -42,9 +48,9 @@ public class SecureDataNodeStarter imple
*/
public static class SecureResources {
private final ServerSocket streamingSocket;
- private final SelectChannelConnector listener;
+ private final Connector listener;
public SecureResources(ServerSocket streamingSocket,
- SelectChannelConnector listener) {
+ Connector listener) {
this.streamingSocket = streamingSocket;
this.listener = listener;
@@ -52,12 +58,13 @@ public class SecureDataNodeStarter imple
public ServerSocket getStreamingSocket() { return streamingSocket; }
- public SelectChannelConnector getListener() { return listener; }
+ public Connector getListener() { return listener; }
}
private String [] args;
private SecureResources resources;
-
+ private SSLFactory sslFactory;
+
@Override
public void init(DaemonContext context) throws Exception {
System.err.println("Initializing secure datanode resources");
@@ -82,13 +89,30 @@ public class SecureDataNodeStarter imple
}
// Obtain secure listener for web server
- SelectChannelConnector listener =
- (SelectChannelConnector)HttpServer.createDefaultChannelConnector();
+ Connector listener;
+ if (HttpConfig.isSecure()) {
+ sslFactory = new SSLFactory(SSLFactory.Mode.SERVER, conf);
+ try {
+ sslFactory.init();
+ } catch (GeneralSecurityException ex) {
+ throw new IOException(ex);
+ }
+ SslSocketConnector sslListener = new SslSocketConnector() {
+ @Override
+ protected SSLServerSocketFactory createFactory() throws Exception {
+ return sslFactory.createSSLServerSocketFactory();
+ }
+ };
+ listener = sslListener;
+ } else {
+ listener = HttpServer.createDefaultChannelConnector();
+ }
+
InetSocketAddress infoSocAddr = DataNode.getInfoAddr(conf);
listener.setHost(infoSocAddr.getHostName());
listener.setPort(infoSocAddr.getPort());
// Open listener here in order to bind to port as root
- listener.open();
+ listener.open();
if (listener.getPort() != infoSocAddr.getPort()) {
throw new RuntimeException("Unable to bind on specified info port in secure " +
"context. Needed " + streamingAddr.getPort() + ", got " + ss.getLocalPort());
@@ -111,6 +135,9 @@ public class SecureDataNodeStarter imple
DataNode.secureMain(args, resources);
}
- @Override public void destroy() { /* Nothing to do */ }
+ @Override public void destroy() {
+ sslFactory.destroy();
+ }
+
@Override public void stop() throws Exception { /* Nothing to do */ }
}
Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java Tue Oct 16 00:02:55 2012
@@ -24,6 +24,7 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.Collection;
@@ -1676,10 +1677,10 @@ class FsDatasetImpl implements FsDataset
List<byte[]> blocksVolumeIds = new ArrayList<byte[]>(volumes.volumes.size());
// List of indexes into the list of VolumeIds, pointing at the VolumeId of
// the volume that the block is on
- List<Integer> blocksVolumendexes = new ArrayList<Integer>(blocks.size());
+ List<Integer> blocksVolumeIndexes = new ArrayList<Integer>(blocks.size());
// Initialize the list of VolumeIds simply by enumerating the volumes
for (int i = 0; i < volumes.volumes.size(); i++) {
- blocksVolumeIds.add(new byte[] { (byte) i });
+ blocksVolumeIds.add(ByteBuffer.allocate(4).putInt(i).array());
}
// Determine the index of the VolumeId of each block's volume, by comparing
// the block's volume against the enumerated volumes
@@ -1700,10 +1701,10 @@ class FsDatasetImpl implements FsDataset
if (!isValid) {
volumeIndex = Integer.MAX_VALUE;
}
- blocksVolumendexes.add(volumeIndex);
+ blocksVolumeIndexes.add(volumeIndex);
}
return new HdfsBlocksMetadata(blocks.toArray(new ExtendedBlock[] {}),
- blocksVolumeIds, blocksVolumendexes);
+ blocksVolumeIds, blocksVolumeIndexes);
}
@Override
Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalService.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalService.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalService.java Tue Oct 16 00:02:55 2012
@@ -283,8 +283,9 @@ public class JournalService implements J
new JournalProtocolServerSideTranslatorPB(impl);
BlockingService service =
JournalProtocolService.newReflectiveBlockingService(xlator);
- return RPC.getServer(JournalProtocolPB.class, service,
- address.getHostName(), address.getPort(), 1, false, conf, null);
+ return new RPC.Builder(conf).setProtocol(JournalProtocolPB.class)
+ .setInstance(service).setBindAddress(address.getHostName())
+ .setPort(address.getPort()).setNumHandlers(1).setVerbose(false).build();
}
private void verifyEpoch(long e) throws FencedException {
Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java Tue Oct 16 00:02:55 2012
@@ -114,7 +114,7 @@ class EditLogBackupOutputStream extends
}
@Override // EditLogOutputStream
- protected void flushAndSync() throws IOException {
+ protected void flushAndSync(boolean durable) throws IOException {
assert out.getLength() == 0 : "Output buffer is not empty";
int numReadyTxns = doubleBuf.countReadyTxns();