You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/12/07 20:32:36 UTC

svn commit: r483627 - in /lucene/hadoop/trunk: CHANGES.txt src/java/org/apache/hadoop/dfs/FSConstants.java src/java/org/apache/hadoop/dfs/FSNamesystem.java

Author: cutting
Date: Thu Dec  7 11:32:35 2006
New Revision: 483627

URL: http://svn.apache.org/viewvc?view=rev&rev=483627
Log:
HADOOP-774.  Limit the number of invalid blocks returned with heartbeats by the namenode to datanodes.  Contributed by Dhruba.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=483627&r1=483626&r2=483627
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Thu Dec  7 11:32:35 2006
@@ -18,6 +18,11 @@
  5. HADOOP-629. Fix RPC services to better check the protocol name and
     version.  (omalley via cutting)
 
+ 6. HADOOP-774. Limit the number of invalid blocks returned with
+    heartbeats by the namenode to datanodes.  Transmitting and
+    processing very large invalid block lists can tie up both the
+    namenode and datanode for too long.  (Dhruba Borthakur via cutting) 
+
 
 Release 0.9.1 - 2006-12-06
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java?view=diff&rev=483627&r1=483626&r2=483627
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java Thu Dec  7 11:32:35 2006
@@ -99,6 +99,9 @@
     public static final int STILL_WAITING = 1;
     public static final int COMPLETE_SUCCESS = 2;
 
+    // Chunk the block Invalidate message
+    public static final int BLOCK_INVALIDATE_CHUNK = 100;
+
     //
     // Timeouts, constants
     //

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?view=diff&rev=483627&r1=483626&r2=483627
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Thu Dec  7 11:32:35 2006
@@ -1809,23 +1809,54 @@
         // only if safe mode is off.
         if( isInSafeMode() )
           return null;
-        
+       
         Collection<Block> invalidateSet = recentInvalidateSets.remove( 
                                                       nodeID.getStorageID() );
  
-        if (invalidateSet == null ) 
+        if (invalidateSet == null) {
             return null;
+        }
+
+        Iterator<Block> it = null;
+        int sendNum = invalidateSet.size();
+        int origSize = sendNum;
+        ArrayList sendBlock = new ArrayList(sendNum);
+
+        //
+        // calculate the number of blocks that we send in one message
+        //
+        if (sendNum > FSConstants.BLOCK_INVALIDATE_CHUNK) {
+            sendNum =  FSConstants.BLOCK_INVALIDATE_CHUNK;
+        }
+        //
+        // Copy the first chunk into sendBlock
+        //
+        for (it = invalidateSet.iterator(); sendNum > 0; sendNum--) {
+            assert(it.hasNext());
+            sendBlock.add(it.next());
+            it.remove();
+        }
+
+        //
+        // If we could not send everything in this message, reinsert this item
+        // into the collection.
+        //
+        if (it.hasNext()) {
+            assert(origSize > FSConstants.BLOCK_INVALIDATE_CHUNK);
+            recentInvalidateSets.put(nodeID.getStorageID(), invalidateSet);
+        }
         
-        if(NameNode.stateChangeLog.isInfoEnabled()) {
+        if (NameNode.stateChangeLog.isInfoEnabled()) {
             StringBuffer blockList = new StringBuffer();
-            for( Iterator<Block> it = invalidateSet.iterator(); it.hasNext(); ) {
+            for (int i = 0; i < sendBlock.size(); i++) {
                 blockList.append(' ');
-                blockList.append(it.next().getBlockName());
+                Block block = (Block) sendBlock.get(i);
+                blockList.append(block.getBlockName());
             }
             NameNode.stateChangeLog.debug("BLOCK* NameSystem.blockToInvalidate: "
                    +"ask "+nodeID.getName()+" to delete " + blockList );
         }
-        return (Block[]) invalidateSet.toArray(new Block[invalidateSet.size()]);
+        return (Block[]) sendBlock.toArray(new Block[sendBlock.size()]);
     }
 
     /**