You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2007/01/25 21:58:55 UTC

svn commit: r499967 - in /lucene/hadoop/trunk: CHANGES.txt src/java/org/apache/hadoop/dfs/FSNamesystem.java

Author: cutting
Date: Thu Jan 25 12:58:54 2007
New Revision: 499967

URL: http://svn.apache.org/viewvc?view=rev&rev=499967
Log:
HADOOP-659.  In HDFS, prioritize replication of blocks based on their current replication level.  Contributed by Hairong.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=499967&r1=499966&r2=499967
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Thu Jan 25 12:58:54 2007
@@ -102,6 +102,11 @@
 13. HADOOP-879.  Fix InputFormatBase to handle output generated by
     MapFileOutputFormat.  (cutting)
 
+14. HADOOP-659.  In HDFS, prioritize replication of blocks based on
+    current replication level.  Blocks which are severely
+    under-replicated should be further replicated before blocks which
+    are less under-replicated.  (Hairong Kuang via cutting)
+
 
 Release 0.10.0 - 2007-01-05
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?view=diff&rev=499967&r1=499966&r2=499967
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Thu Jan 25 12:58:54 2007
@@ -152,7 +152,7 @@
     // We also store pending replication-orders.
     // Set of: Block
     //
-    private Collection<Block> neededReplications = new TreeSet<Block>();
+    private UnderReplicationBlocks neededReplications = new UnderReplicationBlocks();
     private Collection<Block> pendingReplications = new TreeSet<Block>();
 
     //
@@ -277,7 +277,189 @@
           }
         }
     }
+    
+    /* get replication factor of a block */
+    private int getReplication( Block block ) {
+        FSDirectory.INode fileINode = dir.getFileByBlock(block);
+        if( fileINode == null ) { // block does not belong to any file
+            return 0;
+        } else {
+            return fileINode.getReplication();
+        }
+    }
 
+    /* Class for keeping track of under replication blocks
+     * Blocks have replication priority, with priority 0 indicating the highest
+     * Blocks have only one replicas has the highest
+     */
+    private class UnderReplicationBlocks {
+        private static final int LEVEL = 3;
+        TreeSet<Block>[] priorityQueues = new TreeSet[LEVEL];
+        
+        /* constructor */
+        UnderReplicationBlocks() {
+            for(int i=0; i<LEVEL; i++) {
+                priorityQueues[i] = new TreeSet<Block>();
+            }
+        }
+        
+        /* Return the total number of under replication blocks */
+        synchronized int size() {
+            int size = 0;
+            for( int i=0; i<LEVEL; i++ ) {
+                size += priorityQueues[i].size();
+            }
+            return size;
+        }
+        
+        /* Return the priority of a block
+        * @param block a under replication block
+        * @param curReplicas current number of replicas of the block
+        * @param expectedReplicas expected number of replicas of the block
+        */
+        private int getPriority(Block block, 
+                int curReplicas, int expectedReplicas) {
+            if (curReplicas>=expectedReplicas) {
+                return LEVEL; // no need to replicate
+            } else if(curReplicas==1) {
+                return 0; // highest priority
+            } else if(curReplicas*3<expectedReplicas) {
+                return 1;
+            } else {
+                return 2;
+            }
+        }
+        
+        /* add a block to a under replication queue according to its priority
+         * @param block a under replication block
+         * @param curReplicas current number of replicas of the block
+         * @param expectedReplicas expected number of replicas of the block
+         */
+        synchronized boolean add(
+            Block block, int curReplicas, int expectedReplicas) {
+            if(expectedReplicas <= curReplicas) {
+                return false;
+            }
+            int priLevel = getPriority(block, curReplicas, expectedReplicas);
+            if( priorityQueues[priLevel].add(block) ) {
+                NameNode.stateChangeLog.debug(
+                        "BLOCK* NameSystem.UnderReplicationBlock.add:"
+                      + block.getBlockName()
+                      + " has only "+curReplicas
+                      + " replicas and need " + expectedReplicas
+                      + " replicas so is added to neededReplications"
+                      + " at priority level " + priLevel );
+                return true;
+            }
+            return false;
+        }
+
+        /* add a block to a under replication queue */
+        synchronized boolean add(Block block) {
+            int curReplicas = countContainingNodes(blocksMap.get(block));
+            int expectedReplicas = getReplication(block);
+            return add(block, curReplicas, expectedReplicas);
+        }
+        
+        /* remove a block from a under replication queue */
+        synchronized boolean remove(Block block, 
+                int oldReplicas, int oldExpectedReplicas) {
+            if(oldExpectedReplicas <= oldReplicas) {
+                return false;
+            }
+            int priLevel = getPriority(block, oldReplicas, oldExpectedReplicas);
+            return remove(block, priLevel);
+        }
+        
+        /* remove a block from a under replication queue given a priority*/
+        private boolean remove(Block block, int priLevel ) {
+            if( priorityQueues[priLevel].remove(block) ) {
+                NameNode.stateChangeLog.debug(
+                     "BLOCK* NameSystem.UnderReplicationBlock.remove: "
+                   + "Removing block " + block.getBlockName()
+                   + " from priority queue "+ priLevel );
+                return true;
+            } else {
+                for(int i=0; i<LEVEL; i++) {
+                    if( i!=priLevel && priorityQueues[i].remove(block) ) {
+                        NameNode.stateChangeLog.debug(
+                             "BLOCK* NameSystem.UnderReplicationBlock.remove: "
+                           + "Removing block " + block.getBlockName()
+                           + " from priority queue "+ i );
+                        return true;
+                    }
+                }
+            }
+            return false;
+        }
+        
+        /* remove a block from a under replication queue */
+        synchronized boolean remove(Block block) {
+            int curReplicas = countContainingNodes(blocksMap.get(block));
+            int expectedReplicas = getReplication(block);
+            return remove(block, curReplicas, expectedReplicas);
+        }
+        
+        /* update the priority level of a block */
+        synchronized void update(Block block,
+                int curReplicasDelta, int expectedReplicasDelta) {
+            int curReplicas = countContainingNodes(blocksMap.get(block));
+            int curExpectedReplicas = getReplication(block);
+            int oldReplicas = curReplicas-curReplicasDelta;
+            int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta;
+            int curPri = getPriority(block, curReplicas, curExpectedReplicas);
+            int oldPri = getPriority(block, oldReplicas, oldExpectedReplicas);
+            if( oldPri != LEVEL && oldPri != curPri ) {
+                remove(block, oldPri);
+            }
+            if( curPri != LEVEL && oldPri != curPri 
+                    && priorityQueues[curPri].add(block)) {
+                NameNode.stateChangeLog.debug(
+                        "BLOCK* NameSystem.UnderReplicationBlock.update:"
+                      + block.getBlockName()
+                      + " has only "+curReplicas
+                      + " replicas and need " + curExpectedReplicas
+                      + " replicas so is added to neededReplications"
+                      + " at priority level " + curPri );
+            }
+        }
+        
+        /* return a iterator of all the under replication blocks */
+        synchronized Iterator<Block> iterator() {
+            return new Iterator<Block>() {
+                int level;
+                Iterator<Block>[] iterator = new Iterator[LEVEL];
+                
+                {
+                    level=0;
+                    for(int i=0; i<LEVEL; i++) {
+                        iterator[i] = priorityQueues[i].iterator();
+                    }
+                }
+                
+                private void update() {
+                    while( level< LEVEL-1 && !iterator[level].hasNext()  ) {
+                        level++;
+                    }
+                }
+                
+                public Block next() {
+                    update();
+                    return iterator[level].next();
+                }
+                
+                public boolean hasNext() {
+                    update();
+                    return iterator[level].hasNext();
+                }
+                
+                public void remove() {
+                    throw new UnsupportedOperationException();
+                }
+            };
+        }
+    }
+    
     /////////////////////////////////////////////////////////
     //
     // These methods are called by HadoopFS clients
@@ -347,20 +529,18 @@
       if( oldRepl == replication ) // the same replication
         return true;
 
-      synchronized( neededReplications ) {
-        if( oldRepl < replication ) { 
-          // old replication < the new one; need to replicate
-          LOG.info("Increasing replication for file " + src 
-                    + ". New replication is " + replication );
-          for( int idx = 0; idx < fileBlocks.length; idx++ )
-            neededReplications.add( fileBlocks[idx] );
-        } else {  
-          // old replication > the new one; need to remove copies
-          LOG.info("Reducing replication for file " + src 
-                    + ". New replication is " + replication );
-          for( int idx = 0; idx < fileBlocks.length; idx++ )
-            proccessOverReplicatedBlock( fileBlocks[idx], replication );
-        }
+      // update needReplication priority queues
+      LOG.info("Increasing replication for file " + src 
+              + ". New replication is " + replication );
+      for( int idx = 0; idx < fileBlocks.length; idx++ )
+          neededReplications.update( fileBlocks[idx], 0, replication-oldRepl );
+      
+      if( oldRepl > replication ) {  
+        // old replication > the new one; need to remove copies
+        LOG.info("Reducing replication for file " + src 
+                + ". New replication is " + replication );
+        for( int idx = 0; idx < fileBlocks.length; idx++ )
+          proccessOverReplicatedBlock( fileBlocks[idx], replication );
       }
       return true;
     }
@@ -715,19 +895,15 @@
 
         // Now that the file is real, we need to be sure to replicate
         // the blocks.
+        int numExpectedReplicas = pendingFile.getReplication();
         for (int i = 0; i < nrBlocks; i++) {
           SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(pendingBlocks[i]);
           // filter out containingNodes that are marked for decommission.
           int numCurrentReplica = countContainingNodes(containingNodes);
 
-            if (numCurrentReplica < pendingFile.getReplication()) {
-                   NameNode.stateChangeLog.debug(
-                          "DIR* NameSystem.completeFile:"
-                        + pendingBlocks[i].getBlockName()+" has only "+containingNodes.size()
-                        +" replicas so is added to neededReplications");           
-                synchronized (neededReplications) {
-                    neededReplications.add(pendingBlocks[i]);
-                }
+            if (numCurrentReplica < numExpectedReplicas) {
+                neededReplications.add(
+                      pendingBlocks[i], numCurrentReplica, numExpectedReplicas);
             }
         }
         return COMPLETE_SUCCESS;
@@ -1608,8 +1784,10 @@
             containingNodes = new TreeSet<DatanodeDescriptor>();
             blocksMap.put(block, containingNodes);
         }
+        int curReplicaDelta = 0;
         if (! containingNodes.contains(node)) {
             containingNodes.add(node);
+            curReplicaDelta = 1;
             // 
             // Hairong: I would prefer to set the level of next logrecord
             // to be debug.
@@ -1625,34 +1803,24 @@
                     + block.getBlockName() + " on " + node.getName());
         }
 
-        synchronized (neededReplications) {
-            FSDirectory.INode fileINode = dir.getFileByBlock(block);
-            if( fileINode == null )  // block does not belong to any file
-                return;
-
-            // filter out containingNodes that are marked for decommission.
-            int numCurrentReplica = countContainingNodes(containingNodes);
-
-            // check whether safe replication is reached for the block
-            // only if it is a part of a files
-            incrementSafeBlockCount( numCurrentReplica );
-            short fileReplication = fileINode.getReplication();
-            if (numCurrentReplica >= fileReplication ) {
-                neededReplications.remove(block);
-                pendingReplications.remove(block);
-                NameNode.stateChangeLog.trace("BLOCK* NameSystem.addStoredBlock: "
-                        +block.getBlockName()+" has "+ numCurrentReplica
-                        +" replicas so is removed from neededReplications and pendingReplications" );
-
-            } else {// numCurrentReplica < fileReplication
-                neededReplications.add(block);
-                NameNode.stateChangeLog.debug("BLOCK* NameSystem.addStoredBlock: "
-                    +block.getBlockName()+" has only "+ numCurrentReplica
-                    +" replicas so is added to neededReplications" );
-            }
-
-            proccessOverReplicatedBlock( block, fileReplication );
-        }
+        FSDirectory.INode fileINode = dir.getFileByBlock(block);
+        if( fileINode == null )  // block does not belong to any file
+            return;
+        
+        // filter out containingNodes that are marked for decommission.
+        int numCurrentReplica = countContainingNodes(containingNodes);
+        
+        // check whether safe replication is reached for the block
+        // only if it is a part of a files
+        incrementSafeBlockCount( numCurrentReplica );
+        
+        // handle underReplication/overReplication
+        short fileReplication = fileINode.getReplication();
+        neededReplications.update(block, curReplicaDelta, 0);
+        if (numCurrentReplica >= fileReplication ) {
+            pendingReplications.remove(block);
+        }        
+        proccessOverReplicatedBlock( block, fileReplication );
     }
     
     /**
@@ -1748,8 +1916,12 @@
           return;
         }
         containingNodes.remove(node);
-        decrementSafeBlockCount( containingNodes.size() );
-        if( containingNodes.size() == 0 )
+        
+        // filter out containingNodes that are marked for decommission.
+        int numCurrentReplica = countContainingNodes(containingNodes);
+
+        decrementSafeBlockCount( numCurrentReplica );
+        if( containingNodes.isEmpty() )
           blocksMap.remove(block);
         //
         // It's possible that the block was removed because of a datanode
@@ -1758,13 +1930,8 @@
         // be-replicated list.
         //
         FSDirectory.INode fileINode = dir.getFileByBlock(block);
-        if( fileINode != null && (countContainingNodes(containingNodes) < fileINode.getReplication())) {
-            synchronized (neededReplications) {
-                neededReplications.add(block);
-            }
-            NameNode.stateChangeLog.debug("BLOCK* NameSystem.removeStoredBlock: "
-                    +block.getBlockName()+" has only "+containingNodes.size()
-                    +" replicas so is added to neededReplications" );
+        if( fileINode != null ) {
+            neededReplications.update(block, -1, 0);
         }
 
         //
@@ -1896,9 +2063,7 @@
                 // replicated.
                 Block decommissionBlocks[] = node.getBlocks();
                 for (int j = 0; j < decommissionBlocks.length; j++) {
-                  synchronized (neededReplications) {
-                    neededReplications.add(decommissionBlocks[j]);
-                  }
+                    neededReplications.update(decommissionBlocks[j], -1, 0);
                 }
               }
               break;
@@ -2107,15 +2272,13 @@
      * reside on the specified node. Otherwise returns false.
      */
     private boolean isReplicationInProgress(DatanodeDescriptor srcNode) {
-      synchronized (neededReplications) {
         for (Iterator<Block> it = neededReplications.iterator(); it.hasNext();){
-          Block block = it.next();
-          Collection<DatanodeDescriptor> containingNodes = blocksMap.get(block); 
-          if (containingNodes.contains(srcNode)) {
-              return true;
-          }
+            Block block = it.next();
+            Collection<DatanodeDescriptor> containingNodes = blocksMap.get(block); 
+            if (containingNodes.contains(srcNode)) {
+                return true;
+            }
         }
-      }
       return false;
     }
 
@@ -2237,9 +2400,10 @@
             DatanodeDescriptor targets[] = 
                       (DatanodeDescriptor[]) replicateTargetSets.get(i);
             int numCurrentReplica = numCurrentReplicas.get(i).intValue();
-            if (numCurrentReplica + targets.length >= 
-                    dir.getFileByBlock( block).getReplication() ) {
-              neededReplications.remove(block);
+            int numExpectedReplica = dir.getFileByBlock( block).getReplication(); 
+            neededReplications.update(
+                    block, numCurrentReplica, numExpectedReplica);
+            if (numCurrentReplica + targets.length >= numExpectedReplica) {
               pendingReplications.add(block);
               NameNode.stateChangeLog.debug(
                 "BLOCK* NameSystem.pendingTransfer: "