You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by tg...@apache.org on 2013/01/04 15:28:52 UTC
svn commit: r1428883 - in
/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs: ./
src/main/java/org/apache/hadoop/hdfs/
src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/
src/test/java/org/apache/hadoop/hdfs/server/blockmanage...
Author: tgraves
Date: Fri Jan 4 14:28:52 2013
New Revision: 1428883
URL: http://svn.apache.org/viewvc?rev=1428883&view=rev
Log:
HDFS-4270. Replications of the highest priority should be allowed to choose a source datanode that has reached its max replication limit (Derek Dagit via tgraves)
Modified:
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1428883&r1=1428882&r2=1428883&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Fri Jan 4 14:28:52 2013
@@ -29,6 +29,10 @@ Release 0.23.6 - UNRELEASED
HDFS-4315. DNs with multiple BPs can have BPOfferServices fail to start
due to unsynchronized map access. (atm via tgraves)
+ HDFS-4270. Replications of the highest priority should be allowed to
+ choose a source datanode that has reached its max replication limit
+ (Derek Dagit via tgraves)
+
Release 0.23.5 - UNRELEASED
INCOMPATIBLE CHANGES
Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1428883&r1=1428882&r2=1428883&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Fri Jan 4 14:28:52 2013
@@ -118,6 +118,8 @@ public class DFSConfigKeys extends Commo
public static final int DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_DEFAULT = -1;
public static final String DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY = "dfs.namenode.replication.max-streams";
public static final int DFS_NAMENODE_REPLICATION_MAX_STREAMS_DEFAULT = 2;
+ public static final String DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY = "dfs.namenode.replication.max-streams-hard-limit";
+ public static final int DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_DEFAULT = 4;
public static final String DFS_WEBHDFS_ENABLED_KEY = "dfs.webhdfs.enabled";
public static final boolean DFS_WEBHDFS_ENABLED_DEFAULT = false;
public static final String DFS_PERMISSIONS_ENABLED_KEY = "dfs.permissions.enabled";
Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1428883&r1=1428882&r2=1428883&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Fri Jan 4 14:28:52 2013
@@ -156,10 +156,16 @@ public class BlockManager {
/** The maximum number of replicas allowed for a block */
public final short maxReplication;
- /** The maximum number of outgoing replication streams
- * a given node should have at one time
+ /**
+ * The maximum number of outgoing replication streams a given node should have
+ * at one time considering all but the highest priority replications needed.
*/
int maxReplicationStreams;
+ /**
+ * The maximum number of outgoing replication streams a given node should have
+ * at one time.
+ */
+ int replicationStreamsHardLimit;
/** Minimum copies needed or else write is disallowed */
public final short minReplication;
/** Default number of replicas */
@@ -219,10 +225,16 @@ public class BlockManager {
this.minReplication = (short)minR;
this.maxReplication = (short)maxR;
- this.maxReplicationStreams = conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY,
- DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_DEFAULT);
- this.shouldCheckForEnoughRacks = conf.get(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY) == null ? false
- : true;
+ this.maxReplicationStreams =
+ conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY,
+ DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_DEFAULT);
+ this.replicationStreamsHardLimit =
+ conf.getInt(
+ DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY,
+ DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_DEFAULT);
+ this.shouldCheckForEnoughRacks =
+ conf.get(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY) == null
+ ? false : true;
this.replicationRecheckInterval =
conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY,
@@ -329,7 +341,8 @@ public class BlockManager {
NumberReplicas numReplicas = new NumberReplicas();
// source node returned is not used
chooseSourceDatanode(block, containingNodes,
- containingLiveReplicasNodes, numReplicas);
+ containingLiveReplicasNodes, numReplicas,
+ UnderReplicatedBlocks.LEVEL);
assert containingLiveReplicasNodes.size() == numReplicas.liveReplicas();
int usableReplicas = numReplicas.liveReplicas() +
numReplicas.decommissionedReplicas();
@@ -1047,9 +1060,11 @@ public class BlockManager {
liveReplicaNodes = new ArrayList<DatanodeDescriptor>();
NumberReplicas numReplicas = new NumberReplicas();
srcNode = chooseSourceDatanode(
- block, containingNodes, liveReplicaNodes, numReplicas);
+ block, containingNodes, liveReplicaNodes, numReplicas, priority);
if(srcNode == null) // block can not be replicated from any node
+ {
return false;
+ }
assert liveReplicaNodes.size() == numReplicas.liveReplicas();
// do not schedule more if enough replicas is already pending
@@ -1212,16 +1227,34 @@ public class BlockManager {
* since the former do not have write traffic and hence are less busy.
* We do not use already decommissioned nodes as a source.
* Otherwise we choose a random node among those that did not reach their
- * replication limit.
+ * replication limits. However, if the replication is of the highest priority
+ * and all nodes have reached their replication limits, we will choose a
+ * random node despite the replication limit.
*
* In addition form a list of all nodes containing the block
* and calculate its replication numbers.
+ *
+ * @param block Block for which a replication source is needed
+ * @param containingNodes List to be populated with nodes found to contain the
+ * given block
+ * @param nodesContainingLiveReplicas List to be populated with nodes found to
+ * contain live replicas of the given block
+ * @param numReplicas NumberReplicas instance to be initialized with the
+ * counts of live, corrupt, excess, and
+ * decommissioned replicas of the given
+ * block.
+ * @param priority integer representing replication priority of the given
+ * block
+ * @return the DatanodeDescriptor of the chosen node from which to replicate
+ * the given block
*/
- private DatanodeDescriptor chooseSourceDatanode(
+ @VisibleForTesting
+ DatanodeDescriptor chooseSourceDatanode(
Block block,
List<DatanodeDescriptor> containingNodes,
List<DatanodeDescriptor> nodesContainingLiveReplicas,
- NumberReplicas numReplicas) {
+ NumberReplicas numReplicas,
+ int priority) {
containingNodes.clear();
nodesContainingLiveReplicas.clear();
DatanodeDescriptor srcNode = null;
@@ -1250,8 +1283,15 @@ public class BlockManager {
// If so, do not select the node as src node
if ((nodesCorrupt != null) && nodesCorrupt.contains(node))
continue;
- if(node.getNumberOfBlocksToBeReplicated() >= maxReplicationStreams)
+ if(priority != UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY
+ && node.getNumberOfBlocksToBeReplicated() >= maxReplicationStreams)
+ {
continue; // already reached replication limit
+ }
+ if (node.getNumberOfBlocksToBeReplicated() >= replicationStreamsHardLimit)
+ {
+ continue;
+ }
// the block must not be scheduled for removal on srcNode
if(excessBlocks != null && excessBlocks.contains(block))
continue;
Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java?rev=1428883&r1=1428882&r2=1428883&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java Fri Jan 4 14:28:52 2013
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.bl
import static org.junit.Assert.*;
import java.io.IOException;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map.Entry;
@@ -388,4 +389,57 @@ public class TestBlockManager {
}
return repls;
}
+
+ /**
+ * Test that a source node for a highest-priority replication is chosen even if all available
+ * source nodes have reached their replication limits.
+ */
+ @Test
+ public void testHighestPriReplSrcChosenDespiteMaxReplLimit() throws Exception {
+ bm.maxReplicationStreams = 0;
+ bm.replicationStreamsHardLimit = 1;
+
+ long blockId = 42; // arbitrary
+ Block aBlock = new Block(blockId, 0, 0);
+
+ List<DatanodeDescriptor> origNodes = nodes(0, 1);
+ // Add the block to the first node.
+ addBlockOnNodes(blockId,origNodes.subList(0,1));
+
+ List<DatanodeDescriptor> cntNodes = new LinkedList<DatanodeDescriptor>();
+ List<DatanodeDescriptor> liveNodes = new LinkedList<DatanodeDescriptor>();
+
+ assertNotNull("Chooses source node for a highest-priority replication"
+ + " even if all available source nodes have reached their replication"
+ + " limits below the hard limit.",
+ bm.chooseSourceDatanode(
+ aBlock,
+ cntNodes,
+ liveNodes,
+ new NumberReplicas(),
+ UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY));
+
+ assertNull("Does not choose a source node for a less-than-highest-priority"
+ + " replication since all available source nodes have reached"
+ + " their replication limits.",
+ bm.chooseSourceDatanode(
+ aBlock,
+ cntNodes,
+ liveNodes,
+ new NumberReplicas(),
+ UnderReplicatedBlocks.QUEUE_VERY_UNDER_REPLICATED));
+
+ // Increase the replication count to test replication count > hard limit
+ DatanodeDescriptor targets[] = { origNodes.get(1) };
+ origNodes.get(0).addBlockToBeReplicated(aBlock, targets);
+
+ assertNull("Does not choose a source node for a highest-priority"
+ + " replication when all available nodes exceed the hard limit.",
+ bm.chooseSourceDatanode(
+ aBlock,
+ cntNodes,
+ liveNodes,
+ new NumberReplicas(),
+ UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY));
+ }
}