You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ha...@apache.org on 2009/04/07 19:59:09 UTC

svn commit: r762879 - in /hadoop/core/trunk: CHANGES.txt src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java src/hdfs/org/apache/hadoop/hdfs/server/namenode/UnderReplicatedBlocks.java

Author: hairong
Date: Tue Apr  7 17:59:09 2009
New Revision: 762879

URL: http://svn.apache.org/viewvc?rev=762879&view=rev
Log:
HADOOP-3810. NameNode seems unstable on a cluster with little space left. Contributed by Hairong Kuang.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/UnderReplicatedBlocks.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=762879&r1=762878&r2=762879&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Tue Apr  7 17:59:09 2009
@@ -1198,6 +1198,9 @@
     HADOOP-5548. Add synchronization for JobTracker methods in RecoveryManager.
     (Amareshwari Sriramadasu via sharad)
 
+    HADOOP-3810. NameNode seems unstable on a cluster with little space left.
+    (hairong)
+
 Release 0.19.2 - Unreleased
 
   BUG FIXES

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=762879&r1=762878&r2=762879&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Tue Apr  7 17:59:09 2009
@@ -40,6 +40,7 @@
 import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.net.ScriptBasedMapping;
 import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
+import org.apache.hadoop.hdfs.server.namenode.UnderReplicatedBlocks.BlockIterator;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
@@ -2445,30 +2446,57 @@
    * 
    * @return number of blocks scheduled for replication during this iteration.
    */
-  private synchronized int computeReplicationWork(
+  private int computeReplicationWork(
                                   int blocksToProcess) throws IOException {
-    int scheduledReplicationCount = 0;
+    // Choose the blocks to be replicated
+    List<List<Block>> blocksToReplicate = 
+      chooseUnderReplicatedBlocks(blocksToProcess);
 
+    // replicate blocks
+    int scheduledReplicationCount = 0;
+    for (int i=0; i<blocksToReplicate.size(); i++) {
+      for(Block block : blocksToReplicate.get(i)) {
+        if (computeReplicationWorkForBlock(block, i)) {
+          scheduledReplicationCount++;
+        }
+      }
+    }
+    return scheduledReplicationCount;
+  }
+  
+  /** Get a list of block lists to be replicated
+   * The index of block lists represents the 
+   * 
+   * @param blocksToProcess
+   * @return Return a list of block lists to be replicated. 
+   *         The block list index represents its replication priority.
+   */
+  synchronized List<List<Block>> chooseUnderReplicatedBlocks(int blocksToProcess) {
+    // initialize data structure for the return value
+    List<List<Block>> blocksToReplicate = 
+      new ArrayList<List<Block>>(UnderReplicatedBlocks.LEVEL);
+    for (int i=0; i<UnderReplicatedBlocks.LEVEL; i++) {
+      blocksToReplicate.add(new ArrayList<Block>());
+    }
+    
     synchronized(neededReplications) {
       if (neededReplications.size() == 0) {
         missingBlocksInCurIter = 0;
         missingBlocksInPrevIter = 0;
+        return blocksToReplicate;
       }
-      // # of blocks to process equals either twice the number of live 
-      // data-nodes or the number of under-replicated blocks whichever is less
-      blocksToProcess = Math.min(blocksToProcess, neededReplications.size());
-      if(blocksToProcess == 0)
-        return scheduledReplicationCount;
-
+      
       // Go through all blocks that need replications.
-      // Select source and target nodes for replication.
-      Iterator<Block> neededReplicationsIterator = neededReplications.iterator();
+      BlockIterator neededReplicationsIterator = neededReplications.iterator();
       // skip to the first unprocessed block, which is at replIndex 
       for(int i=0; i < replIndex && neededReplicationsIterator.hasNext(); i++) {
         neededReplicationsIterator.next();
       }
-      // process blocks
-      for(int blkCnt = 0; blkCnt < blocksToProcess; blkCnt++, replIndex++) {
+      // # of blocks to process equals either twice the number of live 
+      // data-nodes or the number of under-replicated blocks whichever is less
+      blocksToProcess = Math.min(blocksToProcess, neededReplications.size());
+
+      for (int blkCnt = 0; blkCnt < blocksToProcess; blkCnt++, replIndex++) {
         if( ! neededReplicationsIterator.hasNext()) {
           // start from the beginning
           replIndex = 0;
@@ -2483,52 +2511,100 @@
         }
 
         Block block = neededReplicationsIterator.next();
-
+        int priority = neededReplicationsIterator.getPriority();
+        if (priority < 0 || priority >= blocksToReplicate.size()) {
+          LOG.warn("Unexpected replication priority: " + priority + " " + block);
+        } else {
+          blocksToReplicate.get(priority).add(block);
+        }
+      } // end for
+    } // end synchronized
+    return blocksToReplicate;
+ }
+  
+  /** Replicate a block
+   * 
+   * @param block block to be replicated
+   * @param priority a hint of its priority in the neededReplication queue
+   * @return if the block gets replicated or not
+   */
+  boolean computeReplicationWorkForBlock(Block block, int priority) {
+    int requiredReplication, numEffectiveReplicas; 
+    List<DatanodeDescriptor> containingNodes;
+    DatanodeDescriptor srcNode;
+    
+    synchronized (this) {
+      synchronized (neededReplications) {
         // block should belong to a file
         INodeFile fileINode = blocksMap.getINode(block);
         // abandoned block or block reopened for append
         if(fileINode == null || fileINode.isUnderConstruction()) { 
-          neededReplicationsIterator.remove(); // remove from neededReplications
+          neededReplications.remove(block, priority); // remove from neededReplications
           replIndex--;
-          continue;
+          return false;
         }
-        int requiredReplication = fileINode.getReplication(); 
+        requiredReplication = fileINode.getReplication(); 
 
         // get a source data-node
-        List<DatanodeDescriptor> containingNodes =
-                                          new ArrayList<DatanodeDescriptor>();
+        containingNodes = new ArrayList<DatanodeDescriptor>();
         NumberReplicas numReplicas = new NumberReplicas();
-        DatanodeDescriptor srcNode = 
-          chooseSourceDatanode(block, containingNodes, numReplicas);
-        
+        srcNode = chooseSourceDatanode(block, containingNodes, numReplicas);
         if ((numReplicas.liveReplicas() + numReplicas.decommissionedReplicas())
             <= 0) {          
           missingBlocksInCurIter++;
         }
         if(srcNode == null) // block can not be replicated from any node
-          continue;
+          return false;
 
         // do not schedule more if enough replicas is already pending
-        int numEffectiveReplicas = numReplicas.liveReplicas() +
+        numEffectiveReplicas = numReplicas.liveReplicas() +
                                 pendingReplications.getNumReplicas(block);
         if(numEffectiveReplicas >= requiredReplication) {
-          neededReplicationsIterator.remove(); // remove from neededReplications
+          neededReplications.remove(block, priority); // remove from neededReplications
+          replIndex--;
+          NameNode.stateChangeLog.info("BLOCK* "
+              + "Removing block " + block
+              + " from neededReplications as it has enough replicas.");
+          return false;
+        }
+      }
+    }
+
+    // choose replication targets: NOT HODING THE GLOBAL LOCK
+    DatanodeDescriptor targets[] = replicator.chooseTarget(
+        requiredReplication - numEffectiveReplicas,
+        srcNode, containingNodes, null, block.getNumBytes());
+    if(targets.length == 0)
+      return false;
+
+    synchronized (this) {
+      synchronized (neededReplications) {
+        // Recheck since global lock was released
+        // block should belong to a file
+        INodeFile fileINode = blocksMap.getINode(block);
+        // abandoned block or block reopened for append
+        if(fileINode == null || fileINode.isUnderConstruction()) { 
+          neededReplications.remove(block, priority); // remove from neededReplications
+          replIndex--;
+          return false;
+        }
+        requiredReplication = fileINode.getReplication(); 
+
+        // do not schedule more if enough replicas is already pending
+        NumberReplicas numReplicas = countNodes(block);
+        numEffectiveReplicas = numReplicas.liveReplicas() +
+        pendingReplications.getNumReplicas(block);
+        if(numEffectiveReplicas >= requiredReplication) {
+          neededReplications.remove(block, priority); // remove from neededReplications
           replIndex--;
           NameNode.stateChangeLog.info("BLOCK* "
               + "Removing block " + block
               + " from neededReplications as it has enough replicas.");
-          continue;
+          return false;
         }
 
-        // choose replication targets
-        DatanodeDescriptor targets[] = replicator.chooseTarget(
-            requiredReplication - numEffectiveReplicas,
-            srcNode, containingNodes, null, block.getNumBytes());
-        if(targets.length == 0)
-          continue;
         // Add block to the to be replicated list
         srcNode.addBlockToBeReplicated(block, targets);
-        scheduledReplicationCount++;
 
         for (DatanodeDescriptor dn : targets) {
           dn.incBlocksScheduled();
@@ -2541,10 +2617,10 @@
         NameNode.stateChangeLog.debug(
             "BLOCK* block " + block
             + " is moved from neededReplications to pendingReplications");
-        
+
         // remove from neededReplications
         if(numEffectiveReplicas + targets.length >= requiredReplication) {
-          neededReplicationsIterator.remove(); // remove from neededReplications
+          neededReplications.remove(block, priority); // remove from neededReplications
           replIndex--;
         }
         if (NameNode.stateChangeLog.isInfoEnabled()) {
@@ -2563,7 +2639,8 @@
         }
       }
     }
-    return scheduledReplicationCount;
+    
+    return true;
   }
 
   /**

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/UnderReplicatedBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/UnderReplicatedBlocks.java?rev=762879&r1=762878&r2=762879&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/UnderReplicatedBlocks.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/UnderReplicatedBlocks.java Tue Apr  7 17:59:09 2009
@@ -26,7 +26,7 @@
  * Blocks have only one replicas has the highest
  */
 class UnderReplicatedBlocks implements Iterable<Block> {
-  private static final int LEVEL = 3;
+  static final int LEVEL = 3;
   private List<TreeSet<Block>> priorityQueues = new ArrayList<TreeSet<Block>>();
       
   /* constructor */
@@ -129,7 +129,7 @@
   }
       
   /* remove a block from a under replication queue given a priority*/
-  private boolean remove(Block block, int priLevel) {
+  boolean remove(Block block, int priLevel) {
     if(priLevel >= 0 && priLevel < LEVEL 
         && priorityQueues.get(priLevel).remove(block)) {
       NameNode.stateChangeLog.debug(
@@ -182,12 +182,15 @@
     }
   }
       
-  /* return a iterator of all the under replication blocks */
-  public synchronized Iterator<Block> iterator() {
-    return new Iterator<Block>() {
+  /* return an iterator of all the under replication blocks */
+  public synchronized BlockIterator iterator() {
+    return new BlockIterator();
+  }
+  
+    class BlockIterator implements Iterator<Block> {
       private int level;
       private List<Iterator<Block>> iterators = new ArrayList<Iterator<Block>>();
-              
+      BlockIterator()  
       {
         level=0;
         for(int i=0; i<LEVEL; i++) {
@@ -214,6 +217,9 @@
       public void remove() {
         iterators.get(level).remove();
       }
+      
+      public int getPriority() {
+        return level;
     };
   }
 }