You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sz...@apache.org on 2009/02/07 00:13:25 UTC

svn commit: r741776 - in /hadoop/core/trunk: CHANGES.txt src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java src/test/org/apache/hadoop/hdfs/server/namenode/TestComputeInvalidateWork.java

Author: szetszwo
Date: Fri Feb  6 23:13:25 2009
New Revision: 741776

URL: http://svn.apache.org/viewvc?rev=741776&view=rev
Log:
DOOP-5124. Choose datanodes randomly instead of starting from the first for providing fairness.  (hairong via szetszwo)

Added:
    hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestComputeInvalidateWork.java
Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=741776&r1=741775&r2=741776&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri Feb  6 23:13:25 2009
@@ -71,6 +71,9 @@
     HADOOP-3327. Improves handling of READ_TIMEOUT during map output copying.
     (Amareshwari Sriramadasu via ddas)
 
+    HADOOP-5124. Choose datanodes randomly instead of starting from the first
+    datanode for providing fairness.  (hairong via szetszwo)
+
   OPTIMIZATIONS
 
   BUG FIXES

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=741776&r1=741775&r2=741776&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Fri Feb  6 23:13:25 2009
@@ -264,7 +264,7 @@
   private final GenerationStamp generationStamp = new GenerationStamp();
 
   // Ask Datanode only up to this many blocks to delete.
-  private int blockInvalidateLimit = FSConstants.BLOCK_INVALIDATE_CHUNK;
+  int blockInvalidateLimit = FSConstants.BLOCK_INVALIDATE_CHUNK;
 
   // precision of access times.
   private long accessTimePrecision = 0;
@@ -1146,8 +1146,12 @@
           // remove this block from the list of pending blocks to be deleted. 
           // This reduces the possibility of triggering HADOOP-1349.
           //
-          for(Collection<Block> v : recentInvalidateSets.values()) {
-            v.remove(last);
+          for (DatanodeDescriptor dd : targets) {
+            String datanodeId = dd.getStorageID();
+            Collection<Block> v = recentInvalidateSets.get(datanodeId);
+            if (v != null && v.remove(last) && v.isEmpty()) {
+              recentInvalidateSets.remove(datanodeId);
+            }
           }
         }
       }
@@ -1451,7 +1455,7 @@
    * @param b block
    * @param n datanode
    */
-  private void addToInvalidatesNoLog(Block b, DatanodeInfo n) {
+  void addToInvalidatesNoLog(Block b, DatanodeInfo n) {
     Collection<Block> invalidateSet = recentInvalidateSets.get(n.getStorageID());
     if (invalidateSet == null) {
       invalidateSet = new HashSet<Block>();
@@ -2296,13 +2300,37 @@
     return workFound;
   }
 
-  private int computeInvalidateWork(int nodesToProcess) {
+  /**
+   * Schedule blocks for deletion at datanodes
+   * @param nodesToProcess number of datanodes to schedule deletion work
+   * @return total number of block for deletion
+   */
+  int computeInvalidateWork(int nodesToProcess) {
+    int numOfNodes = recentInvalidateSets.size();
+    nodesToProcess = Math.min(numOfNodes, nodesToProcess);
+    
+    // get an array of the keys
+    ArrayList<String> keyArray =
+      new ArrayList<String>(recentInvalidateSets.keySet());
+
+    // randomly pick up <i>nodesToProcess</i> nodes 
+    // and put them at [0, nodesToProcess)
+    int remainingNodes = numOfNodes - nodesToProcess;
+    if (nodesToProcess < remainingNodes) {
+      for(int i=0; i<nodesToProcess; i++) {
+        int keyIndex = r.nextInt(numOfNodes-i)+i;
+        Collections.swap(keyArray, keyIndex, i); // swap to front
+      }
+    } else {
+      for(int i=0; i<remainingNodes; i++) {
+        int keyIndex = r.nextInt(numOfNodes-i);
+        Collections.swap(keyArray, keyIndex, numOfNodes-i-1); // swap to end
+      }
+    }
+    
     int blockCnt = 0;
     for(int nodeCnt = 0; nodeCnt < nodesToProcess; nodeCnt++ ) {
-      int work = invalidateWorkForOneNode();
-      if(work == 0)
-        break;
-      blockCnt += work;
+      blockCnt += invalidateWorkForOneNode(keyArray.get(nodeCnt));
     }
     return blockCnt;
   }
@@ -2498,24 +2526,25 @@
   }
 
   /**
-   * Get blocks to invalidate for the first node 
+   * Get blocks to invalidate for <i>nodeId</i> 
    * in {@link #recentInvalidateSets}.
    * 
    * @return number of blocks scheduled for removal during this iteration.
    */
-  private synchronized int invalidateWorkForOneNode() {
+  private synchronized int invalidateWorkForOneNode(String nodeId) {
     // blocks should not be replicated or removed if safe mode is on
     if (isInSafeMode())
       return 0;
-    if(recentInvalidateSets.isEmpty())
+    // get blocks to invalidate for the nodeId
+    assert nodeId != null;
+    DatanodeDescriptor dn = datanodeMap.get(nodeId);
+    if (dn == null) {
+      recentInvalidateSets.remove(nodeId);
       return 0;
-    // get blocks to invalidate for the first node
-    String firstNodeId = recentInvalidateSets.keySet().iterator().next();
-    assert firstNodeId != null;
-    DatanodeDescriptor dn = datanodeMap.get(firstNodeId);
-    Collection<Block> invalidateSet = recentInvalidateSets.remove(firstNodeId);
- 
-    if(invalidateSet == null || dn == null)
+    }
+    
+    Collection<Block> invalidateSet = recentInvalidateSets.get(nodeId);
+    if (invalidateSet == null)
       return 0;
 
     ArrayList<Block> blocksToInvalidate = 
@@ -2529,10 +2558,9 @@
       it.remove();
     }
 
-    // If we could not send everything in this message, reinsert this item
-    // into the collection.
-    if(it.hasNext())
-      recentInvalidateSets.put(firstNodeId, invalidateSet);
+    // If we send everything in this message, remove this node entry
+    if(!it.hasNext())
+      recentInvalidateSets.remove(nodeId);
 
     dn.addBlocksToBeInvalidated(blocksToInvalidate);
 

Added: hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestComputeInvalidateWork.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestComputeInvalidateWork.java?rev=741776&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestComputeInvalidateWork.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestComputeInvalidateWork.java Fri Feb  6 23:13:25 2009
@@ -0,0 +1,57 @@
+package org.apache.hadoop.hdfs.server.namenode;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.common.GenerationStamp;
+
+import junit.framework.TestCase;
+
+/**
+ * Test if FSNamesystem handles heartbeat right
+ */
+public class TestComputeInvalidateWork extends TestCase {
+  /**
+   * Test if {@link FSNamesystem#computeInvalidateWork(int)}
+   * can schedule invalidate work correctly 
+   */
+  public void testCompInvalidate() throws Exception {
+    final Configuration conf = new Configuration();
+    final int NUM_OF_DATANODES = 3;
+    final MiniDFSCluster cluster = new MiniDFSCluster(conf, NUM_OF_DATANODES, true, null);
+    try {
+      cluster.waitActive();
+      final FSNamesystem namesystem = cluster.getNamesystem();
+      DatanodeDescriptor[] nodes =
+        namesystem.heartbeats.toArray(new DatanodeDescriptor[NUM_OF_DATANODES]);
+      assertEquals(nodes.length, NUM_OF_DATANODES);
+      
+      synchronized (namesystem) {
+      for (int i=0; i<nodes.length; i++) {
+        for(int j=0; j<3*namesystem.blockInvalidateLimit+1; j++) {
+          Block block = new Block(i*(namesystem.blockInvalidateLimit+1)+j, 0, 
+              GenerationStamp.FIRST_VALID_STAMP);
+          namesystem.addToInvalidatesNoLog(block, nodes[i]);
+        }
+      }
+      
+      assertEquals(namesystem.blockInvalidateLimit*NUM_OF_DATANODES, 
+          namesystem.computeInvalidateWork(NUM_OF_DATANODES+1));
+      assertEquals(namesystem.blockInvalidateLimit*NUM_OF_DATANODES, 
+          namesystem.computeInvalidateWork(NUM_OF_DATANODES));
+      assertEquals(namesystem.blockInvalidateLimit*(NUM_OF_DATANODES-1), 
+          namesystem.computeInvalidateWork(NUM_OF_DATANODES-1));
+      int workCount = namesystem.computeInvalidateWork(1);
+      if (workCount == 1) {
+        assertEquals(namesystem.blockInvalidateLimit+1, 
+            namesystem.computeInvalidateWork(2));        
+      } else {
+        assertEquals(workCount, namesystem.blockInvalidateLimit);
+        assertEquals(2, namesystem.computeInvalidateWork(2));
+      }
+      }
+    } finally {
+      cluster.shutdown();
+    }
+  }
+}