You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2013/01/04 09:13:53 UTC

svn commit: r1428743 - in /hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ src/test/java/org/apache/hadoop/hdfs/server...

Author: szetszwo
Date: Fri Jan  4 08:13:52 2013
New Revision: 1428743

URL: http://svn.apache.org/viewvc?rev=1428743&view=rev
Log:
svn merge -c 1428739 from trunk for HDFS-4270. Introduce soft and hard limits for max replication so that replications of the highest priority are allowed to choose a source datanode that has reached its soft limit but not the hard limit.

Modified:
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/   (props changed)
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/   (props changed)
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java

Propchange: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs:r1428739

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1428743&r1=1428742&r2=1428743&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Fri Jan  4 08:13:52 2013
@@ -153,6 +153,11 @@ Release 2.0.3-alpha - Unreleased
 
     HDFS-4326. bump up Tomcat version for HttpFS to 6.0.36. (tucu via acmurthy)
 
+    HDFS-4270. Introduce soft and hard limits for max replication so that
+    replications of the highest priority are allowed to choose a source datanode
+    that has reached its soft limit but not the hard limit.  (Derek Dagit via
+    szetszwo)
+
   OPTIMIZATIONS
 
   BUG FIXES 

Propchange: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java:r1428739

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1428743&r1=1428742&r2=1428743&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Fri Jan  4 08:13:52 2013
@@ -143,6 +143,8 @@ public class DFSConfigKeys extends Commo
   public static final int     DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_DEFAULT = -1;
   public static final String  DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY = "dfs.namenode.replication.max-streams";
   public static final int     DFS_NAMENODE_REPLICATION_MAX_STREAMS_DEFAULT = 2;
+  public static final String  DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY = "dfs.namenode.replication.max-streams-hard-limit";
+  public static final int     DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_DEFAULT = 4;
   public static final String  DFS_WEBHDFS_ENABLED_KEY = "dfs.webhdfs.enabled";
   public static final boolean DFS_WEBHDFS_ENABLED_DEFAULT = false;
   public static final String  DFS_PERMISSIONS_ENABLED_KEY = "dfs.permissions.enabled";

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1428743&r1=1428742&r2=1428743&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Fri Jan  4 08:13:52 2013
@@ -190,10 +190,16 @@ public class BlockManager {
 
   /** The maximum number of replicas allowed for a block */
   public final short maxReplication;
-  /** The maximum number of outgoing replication streams
-   *  a given node should have at one time 
-   */
+  /**
+   * The maximum number of outgoing replication streams a given node should have
+   * at one time considering all but the highest priority replications needed.
+    */
   int maxReplicationStreams;
+  /**
+   * The maximum number of outgoing replication streams a given node should have
+   * at one time.
+   */
+  int replicationStreamsHardLimit;
   /** Minimum copies needed or else write is disallowed */
   public final short minReplication;
   /** Default number of replicas */
@@ -264,9 +270,16 @@ public class BlockManager {
     this.minReplication = (short)minR;
     this.maxReplication = (short)maxR;
 
-    this.maxReplicationStreams = conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY,
-                                             DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_DEFAULT);
-    this.shouldCheckForEnoughRacks = conf.get(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY) != null;
+    this.maxReplicationStreams =
+        conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY,
+            DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_DEFAULT);
+    this.replicationStreamsHardLimit =
+        conf.getInt(
+            DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY,
+            DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_DEFAULT);
+    this.shouldCheckForEnoughRacks =
+        conf.get(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY) == null
+            ? false : true;
 
     this.blocksInvalidateWorkPct = DFSUtil.getInvalidateWorkPctPerIteration(conf);
     this.blocksReplWorkMultiplier = DFSUtil.getReplWorkMultiplier(conf);
@@ -436,7 +449,8 @@ public class BlockManager {
     NumberReplicas numReplicas = new NumberReplicas();
     // source node returned is not used
     chooseSourceDatanode(block, containingNodes,
-        containingLiveReplicasNodes, numReplicas);
+        containingLiveReplicasNodes, numReplicas,
+        UnderReplicatedBlocks.LEVEL);
     assert containingLiveReplicasNodes.size() == numReplicas.liveReplicas();
     int usableReplicas = numReplicas.liveReplicas() +
                          numReplicas.decommissionedReplicas();
@@ -1146,11 +1160,12 @@ public class BlockManager {
             liveReplicaNodes = new ArrayList<DatanodeDescriptor>();
             NumberReplicas numReplicas = new NumberReplicas();
             srcNode = chooseSourceDatanode(
-                block, containingNodes, liveReplicaNodes, numReplicas);
+                block, containingNodes, liveReplicaNodes, numReplicas,
+                priority);
             if(srcNode == null) { // block can not be replicated from any node
               LOG.debug("Block " + block + " cannot be repl from any node");
               continue;
-          }
+            }
 
             assert liveReplicaNodes.size() == numReplicas.liveReplicas();
             // do not schedule more if enough replicas is already pending
@@ -1340,16 +1355,34 @@ public class BlockManager {
    * since the former do not have write traffic and hence are less busy.
    * We do not use already decommissioned nodes as a source.
    * Otherwise we choose a random node among those that did not reach their
-   * replication limit.
+   * replication limits.  However, if the replication is of the highest priority
+   * and all nodes have reached their replication limits, we will choose a
+   * random node despite the replication limit.
    *
    * In addition form a list of all nodes containing the block
    * and calculate its replication numbers.
+   *
+   * @param block Block for which a replication source is needed
+   * @param containingNodes List to be populated with nodes found to contain the 
+   *                        given block
+   * @param nodesContainingLiveReplicas List to be populated with nodes found to
+   *                                    contain live replicas of the given block
+   * @param numReplicas NumberReplicas instance to be initialized with the 
+   *                                   counts of live, corrupt, excess, and
+   *                                   decommissioned replicas of the given
+   *                                   block.
+   * @param priority integer representing replication priority of the given
+   *                 block
+   * @return the DatanodeDescriptor of the chosen node from which to replicate
+   *         the given block
    */
-  private DatanodeDescriptor chooseSourceDatanode(
+   @VisibleForTesting
+   DatanodeDescriptor chooseSourceDatanode(
                                     Block block,
                                     List<DatanodeDescriptor> containingNodes,
                                     List<DatanodeDescriptor> nodesContainingLiveReplicas,
-                                    NumberReplicas numReplicas) {
+                                    NumberReplicas numReplicas,
+                                    int priority) {
     containingNodes.clear();
     nodesContainingLiveReplicas.clear();
     DatanodeDescriptor srcNode = null;
@@ -1378,8 +1411,15 @@ public class BlockManager {
       // If so, do not select the node as src node
       if ((nodesCorrupt != null) && nodesCorrupt.contains(node))
         continue;
-      if(node.getNumberOfBlocksToBeReplicated() >= maxReplicationStreams)
+      if(priority != UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY
+          && node.getNumberOfBlocksToBeReplicated() >= maxReplicationStreams)
+      {
         continue; // already reached replication limit
+      }
+      if (node.getNumberOfBlocksToBeReplicated() >= replicationStreamsHardLimit)
+      {
+        continue;
+      }
       // the block must not be scheduled for removal on srcNode
       if(excessBlocks != null && excessBlocks.contains(block))
         continue;

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java?rev=1428743&r1=1428742&r2=1428743&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java Fri Jan  4 08:13:52 2013
@@ -19,10 +19,13 @@ package org.apache.hadoop.hdfs.server.bl
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map.Entry;
 
@@ -429,4 +432,57 @@ public class TestBlockManager {
     }
     return repls;
   }
+
+  /**
+   * Test that a source node for a highest-priority replication is chosen even if all available
+   * source nodes have reached their replication limits.
+   */
+  @Test
+  public void testHighestPriReplSrcChosenDespiteMaxReplLimit() throws Exception {
+    bm.maxReplicationStreams = 0;
+    bm.replicationStreamsHardLimit = 1;
+
+    long blockId = 42;         // arbitrary
+    Block aBlock = new Block(blockId, 0, 0);
+
+    List<DatanodeDescriptor> origNodes = getNodes(0, 1);
+    // Add the block to the first node.
+    addBlockOnNodes(blockId,origNodes.subList(0,1));
+
+    List<DatanodeDescriptor> cntNodes = new LinkedList<DatanodeDescriptor>();
+    List<DatanodeDescriptor> liveNodes = new LinkedList<DatanodeDescriptor>();
+
+    assertNotNull("Chooses source node for a highest-priority replication"
+        + " even if all available source nodes have reached their replication"
+        + " limits below the hard limit.",
+        bm.chooseSourceDatanode(
+            aBlock,
+            cntNodes,
+            liveNodes,
+            new NumberReplicas(),
+            UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY));
+
+    assertNull("Does not choose a source node for a less-than-highest-priority"
+        + " replication since all available source nodes have reached"
+        + " their replication limits.",
+        bm.chooseSourceDatanode(
+            aBlock,
+            cntNodes,
+            liveNodes,
+            new NumberReplicas(),
+            UnderReplicatedBlocks.QUEUE_VERY_UNDER_REPLICATED));
+
+    // Increase the replication count to test replication count > hard limit
+    DatanodeDescriptor targets[] = { origNodes.get(1) };
+    origNodes.get(0).addBlockToBeReplicated(aBlock, targets);
+
+    assertNull("Does not choose a source node for a highest-priority"
+        + " replication when all available nodes exceed the hard limit.",
+        bm.chooseSourceDatanode(
+            aBlock,
+            cntNodes,
+            liveNodes,
+            new NumberReplicas(),
+            UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY));
+  }
 }