You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ha...@apache.org on 2011/11/08 01:16:24 UTC

svn commit: r1199024 - in /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: CHANGES.txt src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java

Author: hairong
Date: Tue Nov  8 00:16:23 2011
New Revision: 1199024

URL: http://svn.apache.org/viewvc?rev=1199024&view=rev
Log:
HDFS-2495. Increase granularity of write operations in ReplicationMonitor thus reducing contention for write lock. Contributed by Tomasz Nykiel.

Modified:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1199024&r1=1199023&r2=1199024&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Tue Nov  8 00:16:23 2011
@@ -52,9 +52,14 @@ Trunk (unreleased changes)
 
     HDFS-2334. Add Closeable to JournalManager. (Ivan Kelly via jitendra)
 
+
+  OPTIMIZATIONS
     HDFS-2477. Optimize computing the diff between a block report and the
                namenode state. (Tomasz Nykiel via hairong)
 
+    HDFS-2495. Increase granularity of write operations in ReplicationMonitor
+    thus reducing contention for write lock. (Tomasz Nykiel via hairong)
+
   BUG FIXES
     HDFS-2287. TestParallelRead has a small off-by-one bug. (todd)
 

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1199024&r1=1199023&r2=1199024&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Tue Nov  8 00:16:23 2011
@@ -929,15 +929,7 @@ public class BlockManager {
       chooseUnderReplicatedBlocks(blocksToProcess);
 
     // replicate blocks
-    int scheduledReplicationCount = 0;
-    for (int i=0; i<blocksToReplicate.size(); i++) {
-      for(Block block : blocksToReplicate.get(i)) {
-        if (computeReplicationWorkForBlock(block, i)) {
-          scheduledReplicationCount++;
-        }
-      }
-    }
-    return scheduledReplicationCount;
+    return computeReplicationWorkForBlocks(blocksToReplicate);
   }
 
   /**
@@ -1002,170 +994,201 @@ public class BlockManager {
     return blocksToReplicate;
   }
 
-  /** Replicate a block
+  /** Replicate a set of blocks
    *
-   * @param block block to be replicated
-   * @param priority a hint of its priority in the neededReplication queue
-   * @return if the block gets replicated or not
+   * @param blocksToReplicate blocks to be replicated, for each priority
+   * @return the number of blocks scheduled for replication
    */
   @VisibleForTesting
-  boolean computeReplicationWorkForBlock(Block block, int priority) {
+  int computeReplicationWorkForBlocks(List<List<Block>> blocksToReplicate) {
     int requiredReplication, numEffectiveReplicas;
     List<DatanodeDescriptor> containingNodes, liveReplicaNodes;
     DatanodeDescriptor srcNode;
     INodeFile fileINode = null;
     int additionalReplRequired;
 
+    int scheduledWork = 0;
+    List<ReplicationWork> work = new LinkedList<ReplicationWork>();
+
     namesystem.writeLock();
     try {
       synchronized (neededReplications) {
-        // block should belong to a file
-        fileINode = blocksMap.getINode(block);
-        // abandoned block or block reopened for append
-        if(fileINode == null || fileINode.isUnderConstruction()) {
-          neededReplications.remove(block, priority); // remove from neededReplications
-          replIndex--;
-          return false;
-        }
+        for (int priority = 0; priority < blocksToReplicate.size(); priority++) {
+          for (Block block : blocksToReplicate.get(priority)) {
+            // block should belong to a file
+            fileINode = blocksMap.getINode(block);
+            // abandoned block or block reopened for append
+            if(fileINode == null || fileINode.isUnderConstruction()) {
+              neededReplications.remove(block, priority); // remove from neededReplications
+              replIndex--;
+              continue;
+            }
 
-        requiredReplication = fileINode.getReplication();
+            requiredReplication = fileINode.getReplication();
 
-        // get a source data-node
-        containingNodes = new ArrayList<DatanodeDescriptor>();
-        liveReplicaNodes = new ArrayList<DatanodeDescriptor>();
-        NumberReplicas numReplicas = new NumberReplicas();
-        srcNode = chooseSourceDatanode(
-            block, containingNodes, liveReplicaNodes, numReplicas);
-        if(srcNode == null) // block can not be replicated from any node
-          return false;
-
-        assert liveReplicaNodes.size() == numReplicas.liveReplicas();
-        // do not schedule more if enough replicas is already pending
-        numEffectiveReplicas = numReplicas.liveReplicas() +
-                                pendingReplications.getNumReplicas(block);
+            // get a source data-node
+            containingNodes = new ArrayList<DatanodeDescriptor>();
+            liveReplicaNodes = new ArrayList<DatanodeDescriptor>();
+            NumberReplicas numReplicas = new NumberReplicas();
+            srcNode = chooseSourceDatanode(
+                block, containingNodes, liveReplicaNodes, numReplicas);
+            if(srcNode == null) // block can not be replicated from any node
+              continue;
+
+            assert liveReplicaNodes.size() == numReplicas.liveReplicas();
+            // do not schedule more if enough replicas is already pending
+            numEffectiveReplicas = numReplicas.liveReplicas() +
+                                    pendingReplications.getNumReplicas(block);
       
-        if (numEffectiveReplicas >= requiredReplication) {
-          if ( (pendingReplications.getNumReplicas(block) > 0) ||
-               (blockHasEnoughRacks(block)) ) {
-            neededReplications.remove(block, priority); // remove from neededReplications
-            replIndex--;
-            NameNode.stateChangeLog.info("BLOCK* "
-                + "Removing block " + block
-                + " from neededReplications as it has enough replicas.");
-            return false;
-          }
-        }
+            if (numEffectiveReplicas >= requiredReplication) {
+              if ( (pendingReplications.getNumReplicas(block) > 0) ||
+                   (blockHasEnoughRacks(block)) ) {
+                neededReplications.remove(block, priority); // remove from neededReplications
+                replIndex--;
+                NameNode.stateChangeLog.info("BLOCK* "
+                    + "Removing block " + block
+                    + " from neededReplications as it has enough replicas.");
+                continue;
+              }
+            }
 
-        if (numReplicas.liveReplicas() < requiredReplication) {
-          additionalReplRequired = requiredReplication - numEffectiveReplicas;
-        } else {
-          additionalReplRequired = 1; //Needed on a new rack
+            if (numReplicas.liveReplicas() < requiredReplication) {
+              additionalReplRequired = requiredReplication
+                  - numEffectiveReplicas;
+            } else {
+              additionalReplRequired = 1; // Needed on a new rack
+            }
+            work.add(new ReplicationWork(block, fileINode, srcNode,
+                containingNodes, liveReplicaNodes, additionalReplRequired,
+                priority));
+          }
         }
-
       }
     } finally {
       namesystem.writeUnlock();
     }
-    
-    // Exclude all of the containing nodes from being targets.
-    // This list includes decommissioning or corrupt nodes.
-    HashMap<Node, Node> excludedNodes = new HashMap<Node, Node>();
-    for (DatanodeDescriptor dn : containingNodes) {
-      excludedNodes.put(dn, dn);
-    }
 
-    // choose replication targets: NOT HOLDING THE GLOBAL LOCK
-    // It is costly to extract the filename for which chooseTargets is called,
-    // so for now we pass in the Inode itself.
-    DatanodeDescriptor targets[] = 
-                       blockplacement.chooseTarget(fileINode, additionalReplRequired,
-                       srcNode, liveReplicaNodes, excludedNodes, block.getNumBytes());
-    if(targets.length == 0)
-      return false;
+    HashMap<Node, Node> excludedNodes
+        = new HashMap<Node, Node>();
+    for(ReplicationWork rw : work){
+      // Exclude all of the containing nodes from being targets.
+      // This list includes decommissioning or corrupt nodes.
+      excludedNodes.clear();
+      for (DatanodeDescriptor dn : rw.containingNodes) {
+        excludedNodes.put(dn, dn);
+      }
+
+      // choose replication targets: NOT HOLDING THE GLOBAL LOCK
+      // It is costly to extract the filename for which chooseTargets is called,
+      // so for now we pass in the Inode itself.
+      rw.targets = blockplacement.chooseTarget(rw.fileINode,
+          rw.additionalReplRequired, rw.srcNode, rw.liveReplicaNodes,
+          excludedNodes, rw.block.getNumBytes());
+    }
 
     namesystem.writeLock();
     try {
-      synchronized (neededReplications) {
-        // Recheck since global lock was released
-        // block should belong to a file
-        fileINode = blocksMap.getINode(block);
-        // abandoned block or block reopened for append
-        if(fileINode == null || fileINode.isUnderConstruction()) {
-          neededReplications.remove(block, priority); // remove from neededReplications
-          replIndex--;
-          return false;
+      for(ReplicationWork rw : work){
+        DatanodeDescriptor[] targets = rw.targets;
+        if(targets == null || targets.length == 0){
+          rw.targets = null;
+          continue;
         }
-        requiredReplication = fileINode.getReplication();
 
-        // do not schedule more if enough replicas is already pending
-        NumberReplicas numReplicas = countNodes(block);
-        numEffectiveReplicas = numReplicas.liveReplicas() +
-        pendingReplications.getNumReplicas(block);
-
-        if (numEffectiveReplicas >= requiredReplication) {
-          if ( (pendingReplications.getNumReplicas(block) > 0) ||
-               (blockHasEnoughRacks(block)) ) {
+        synchronized (neededReplications) {
+          Block block = rw.block;
+          int priority = rw.priority;
+          // Recheck since global lock was released
+          // block should belong to a file
+          fileINode = blocksMap.getINode(block);
+          // abandoned block or block reopened for append
+          if(fileINode == null || fileINode.isUnderConstruction()) {
             neededReplications.remove(block, priority); // remove from neededReplications
+            rw.targets = null;
             replIndex--;
-            NameNode.stateChangeLog.info("BLOCK* "
-                + "Removing block " + block
-                + " from neededReplications as it has enough replicas.");
-            return false;
+            continue;
           }
-        }
+          requiredReplication = fileINode.getReplication();
 
-        if ( (numReplicas.liveReplicas() >= requiredReplication) &&
-             (!blockHasEnoughRacks(block)) ) {
-          if (srcNode.getNetworkLocation().equals(targets[0].getNetworkLocation())) {
-            //No use continuing, unless a new rack in this case
-            return false;
+          // do not schedule more if enough replicas is already pending
+          NumberReplicas numReplicas = countNodes(block);
+          numEffectiveReplicas = numReplicas.liveReplicas() +
+            pendingReplications.getNumReplicas(block);
+
+          if (numEffectiveReplicas >= requiredReplication) {
+            if ( (pendingReplications.getNumReplicas(block) > 0) ||
+                 (blockHasEnoughRacks(block)) ) {
+              neededReplications.remove(block, priority); // remove from neededReplications
+              replIndex--;
+              rw.targets = null;
+              NameNode.stateChangeLog.info("BLOCK* "
+                  + "Removing block " + block
+                  + " from neededReplications as it has enough replicas.");
+              continue;
+            }
           }
-        }
 
-        // Add block to the to be replicated list
-        srcNode.addBlockToBeReplicated(block, targets);
+          if ( (numReplicas.liveReplicas() >= requiredReplication) &&
+               (!blockHasEnoughRacks(block)) ) {
+            if (rw.srcNode.getNetworkLocation().equals(targets[0].getNetworkLocation())) {
+              //No use continuing, unless a new rack in this case
+              continue;
+            }
+          }
 
-        for (DatanodeDescriptor dn : targets) {
-          dn.incBlocksScheduled();
-        }
+          // Add block to the to be replicated list
+          rw.srcNode.addBlockToBeReplicated(block, targets);
+          scheduledWork++;
 
-        // Move the block-replication into a "pending" state.
-        // The reason we use 'pending' is so we can retry
-        // replications that fail after an appropriate amount of time.
-        pendingReplications.add(block, targets.length);
-        if(NameNode.stateChangeLog.isDebugEnabled()) {
-          NameNode.stateChangeLog.debug(
-              "BLOCK* block " + block
-              + " is moved from neededReplications to pendingReplications");
-        }
+          for (DatanodeDescriptor dn : targets) {
+            dn.incBlocksScheduled();
+          }
+
+          // Move the block-replication into a "pending" state.
+          // The reason we use 'pending' is so we can retry
+          // replications that fail after an appropriate amount of time.
+          pendingReplications.add(block, targets.length);
+          if(NameNode.stateChangeLog.isDebugEnabled()) {
+            NameNode.stateChangeLog.debug(
+                "BLOCK* block " + block
+                + " is moved from neededReplications to pendingReplications");
+          }
 
-        // remove from neededReplications
-        if(numEffectiveReplicas + targets.length >= requiredReplication) {
-          neededReplications.remove(block, priority); // remove from neededReplications
-          replIndex--;
+          // remove from neededReplications
+          if(numEffectiveReplicas + targets.length >= requiredReplication) {
+            neededReplications.remove(block, priority); // remove from neededReplications
+            replIndex--;
+          }
         }
-        if (NameNode.stateChangeLog.isInfoEnabled()) {
+      }
+    } finally {
+      namesystem.writeUnlock();
+    }
+
+    if (NameNode.stateChangeLog.isInfoEnabled()) {
+      // log which blocks have been scheduled for replication
+      for(ReplicationWork rw : work){
+        DatanodeDescriptor[] targets = rw.targets;
+        if (targets != null && targets.length != 0) {
           StringBuilder targetList = new StringBuilder("datanode(s)");
           for (int k = 0; k < targets.length; k++) {
             targetList.append(' ');
             targetList.append(targets[k].getName());
           }
           NameNode.stateChangeLog.info(
-                    "BLOCK* ask "
-                    + srcNode.getName() + " to replicate "
-                    + block + " to " + targetList);
-          if(NameNode.stateChangeLog.isDebugEnabled()) {
-            NameNode.stateChangeLog.debug(
-                "BLOCK* neededReplications = " + neededReplications.size()
-                + " pendingReplications = " + pendingReplications.size());
-          }
+                  "BLOCK* ask "
+                  + rw.srcNode.getName() + " to replicate "
+                  + rw.block + " to " + targetList);
         }
       }
-    } finally {
-      namesystem.writeUnlock();
+    }
+    if(NameNode.stateChangeLog.isDebugEnabled()) {
+        NameNode.stateChangeLog.debug(
+          "BLOCK* neededReplications = " + neededReplications.size()
+          + " pendingReplications = " + pendingReplications.size());
     }
 
-    return true;
+    return scheduledWork;
   }
 
   /**
@@ -2596,4 +2619,34 @@ public class BlockManager {
     return workFound;
   }
 
+  private static class ReplicationWork {
+
+    private Block block;
+    private INodeFile fileINode;
+
+    private DatanodeDescriptor srcNode;
+    private List<DatanodeDescriptor> containingNodes;
+    private List<DatanodeDescriptor> liveReplicaNodes;
+    private int additionalReplRequired;
+
+    private DatanodeDescriptor targets[];
+    private int priority;
+
+    public ReplicationWork(Block block,
+        INodeFile fileINode,
+        DatanodeDescriptor srcNode,
+        List<DatanodeDescriptor> containingNodes,
+        List<DatanodeDescriptor> liveReplicaNodes,
+        int additionalReplRequired,
+        int priority) {
+      this.block = block;
+      this.fileINode = fileINode;
+      this.srcNode = srcNode;
+      this.containingNodes = containingNodes;
+      this.liveReplicaNodes = liveReplicaNodes;
+      this.additionalReplRequired = additionalReplRequired;
+      this.priority = priority;
+      this.targets = null;
+    }
+  }
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java?rev=1199024&r1=1199023&r2=1199024&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java Tue Nov  8 00:16:23 2011
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.bl
 import static org.junit.Assert.*;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map.Entry;
 
@@ -355,25 +356,35 @@ public class TestBlockManager {
     bm.blocksMap.addINode(blockInfo, iNode);
     return blockInfo;
   }
-  
+
   private DatanodeDescriptor[] scheduleSingleReplication(Block block) {
-    assertEquals("Block not initially pending replication",
-        0, bm.pendingReplications.getNumReplicas(block));
-    assertTrue("computeReplicationWork should indicate replication is needed",
-        bm.computeReplicationWorkForBlock(block, 1));
+    // list for priority 1
+    List<Block> list_p1 = new ArrayList<Block>();
+    list_p1.add(block);
+
+    // list of lists for each priority
+    List<List<Block>> list_all = new ArrayList<List<Block>>();
+    list_all.add(new ArrayList<Block>()); // for priority 0
+    list_all.add(list_p1); // for priority 1
+
+    assertEquals("Block not initially pending replication", 0,
+        bm.pendingReplications.getNumReplicas(block));
+    assertEquals(
+        "computeReplicationWork should indicate replication is needed", 1,
+        bm.computeReplicationWorkForBlocks(list_all));
     assertTrue("replication is pending after work is computed",
         bm.pendingReplications.getNumReplicas(block) > 0);
-    
-    LinkedListMultimap<DatanodeDescriptor, BlockTargetPair> repls =
-      getAllPendingReplications();
+
+    LinkedListMultimap<DatanodeDescriptor, BlockTargetPair> repls = getAllPendingReplications();
     assertEquals(1, repls.size());
-    Entry<DatanodeDescriptor, BlockTargetPair> repl = repls.entries().iterator().next();
+    Entry<DatanodeDescriptor, BlockTargetPair> repl = repls.entries()
+        .iterator().next();
     DatanodeDescriptor[] targets = repl.getValue().targets;
-    
+
     DatanodeDescriptor[] pipeline = new DatanodeDescriptor[1 + targets.length];
     pipeline[0] = repl.getKey();
     System.arraycopy(targets, 0, pipeline, 1, targets.length);
-    
+
     return pipeline;
   }