You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by to...@apache.org on 2007/04/06 22:29:23 UTC

svn commit: r526271 - in /lucene/hadoop/trunk: CHANGES.txt src/java/org/apache/hadoop/dfs/Block.java src/java/org/apache/hadoop/dfs/FSDirectory.java src/java/org/apache/hadoop/dfs/FSNamesystem.java src/java/org/apache/hadoop/net/NetworkTopology.java

Author: tomwhite
Date: Fri Apr  6 13:29:23 2007
New Revision: 526271

URL: http://svn.apache.org/viewvc?view=rev&rev=526271
Log:
HADOOP-988.  Change namenode to use a single map of blocks to metadata.  Contributed by Raghu Angadi.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Block.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/net/NetworkTopology.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=526271&r1=526270&r2=526271
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Fri Apr  6 13:29:23 2007
@@ -112,6 +112,9 @@
 35. HADOOP-1151.  Remove spurious printing to stderr in streaming 
     PipeMapRed.  (Koji Noguchi via tomwhite)
 
+36. HADOOP-988.  Change namenode to use a single map of blocks to metadata.
+    (Raghu Angadi via tomwhite)
+
 
 Release 0.12.3 - 2007-04-06
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Block.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Block.java?view=diff&rev=526271&r1=526270&r2=526271
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Block.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Block.java Fri Apr  6 13:29:23 2007
@@ -131,10 +131,10 @@
         }
     }
     public boolean equals(Object o) {
-        return (this.compareTo(o) == 0);
+        return blkid == ((Block)o).blkid;
     }
     
     public int hashCode() {
-        return 37 * 17 + (int) (getBlockId()^(getBlockId()>>>32));
+        return 37 * 17 + (int) (blkid^(blkid>>>32));
     }
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java?view=diff&rev=526271&r1=526270&r2=526271
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java Fri Apr  6 13:29:23 2007
@@ -308,9 +308,8 @@
         }
     }
 
-    
+    FSNamesystem namesystem = null;
     INode rootDir = new INode("");
-    Map activeBlocks = new HashMap();
     TreeMap activeLocks = new TreeMap();
     FSImage fsImage;  
     boolean ready = false;
@@ -318,13 +317,15 @@
     private MetricsRecord directoryMetrics = null;
     
     /** Access an existing dfs name directory. */
-    public FSDirectory() throws IOException {
+    public FSDirectory(FSNamesystem ns) throws IOException {
       this.fsImage = new FSImage();
+      namesystem = ns;
       initialize();
     }
 
-    public FSDirectory(FSImage fsImage) throws IOException {
+    public FSDirectory(FSImage fsImage, FSNamesystem ns) throws IOException {
       this.fsImage = fsImage;
+      namesystem = ns;
       initialize();
     }
     
@@ -415,7 +416,7 @@
                 int nrBlocks = (newNode.blocks == null) ? 0 : newNode.blocks.length;
                 // Add file->block mapping
                 for (int i = 0; i < nrBlocks; i++)
-                    activeBlocks.put(newNode.blocks[i], newNode);
+                    namesystem.blocksMap.addINode(newNode.blocks[i], newNode);
                 return true;
             } else {
                 return false;
@@ -586,7 +587,7 @@
                     targetNode.collectSubtreeBlocks(v);
                     for (Iterator it = v.iterator(); it.hasNext(); ) {
                         Block b = (Block) it.next();
-                        activeBlocks.remove(b);
+                        namesystem.blocksMap.removeINode(b);
                     }
                     return (Block[]) v.toArray(new Block[v.size()]);
                 }
@@ -755,27 +756,5 @@
             srcs = srcs.substring(0, srcs.length() - 1);
         }
         return srcs;
-    }
-
-    /**
-     * Returns whether the given block is one pointed-to by a file.
-     */
-    public boolean isValidBlock(Block b) {
-        synchronized (rootDir) {
-            if (activeBlocks.containsKey(b)) {
-                return true;
-            } else {
-                return false;
-            }
-        }
-    }
-
-    /**
-     * Returns whether the given block is one pointed-to by a file.
-     */
-    public INode getFileByBlock(Block b) {
-      synchronized (rootDir) {
-        return (INode)activeBlocks.get(b);
-      }
     }
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?view=diff&rev=526271&r1=526270&r2=526271
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Fri Apr  6 13:29:23 2007
@@ -30,6 +30,7 @@
 
 import java.io.*;
 import java.util.*;
+import java.lang.UnsupportedOperationException;
 
 import javax.servlet.ServletContext;
 import javax.servlet.ServletException;
@@ -60,11 +61,10 @@
     //
     // Stores the block-->datanode(s) map.  Updated only in response
     // to client-sent information.
-    // Mapping: Block -> TreeSet<DatanodeDescriptor>
+    // Mapping: Block -> { INode, datanodes, self ref } 
     //
-    Map<Block, List<DatanodeDescriptor>> blocksMap = 
-                              new HashMap<Block, List<DatanodeDescriptor>>();
-
+    BlocksMap blocksMap = new BlocksMap();
+    
     /**
      * Stores the datanode -> block map.  
      * <p>
@@ -245,7 +245,7 @@
 
         this.localMachine = hostname;
         this.port = port;
-        this.dir = new FSDirectory();
+        this.dir = new FSDirectory( this );
         StartupOption startOpt = (StartupOption)conf.get( 
                                 "dfs.namenode.startup", StartupOption.REGULAR );
         this.dir.loadFSImage( getNamespaceDirs(conf), startOpt );
@@ -299,7 +299,7 @@
      */
     FSNamesystem(FSImage fsImage) throws IOException {
         fsNamesystemObject = this;
-        this.dir = new FSDirectory(fsImage);
+        this.dir = new FSDirectory(fsImage, this);
     }
 
     /** Return the FSNamesystem object
@@ -366,14 +366,11 @@
           for (Iterator<Block> it = neededReplications.iterator(); 
                it.hasNext();) {
             Block block = it.next();
-            Collection<DatanodeDescriptor> containingNodes = blocksMap.get(block);
             out.print(block);
-            if (containingNodes != null) {
-              for (Iterator<DatanodeDescriptor> jt = containingNodes.iterator();
-                   jt.hasNext(); ) {
-                DatanodeDescriptor node = jt.next();
-                out.print(" " + node + " : " );
-              }
+            for (Iterator<DatanodeDescriptor> jt = blocksMap.nodeIterator(block);
+                 jt.hasNext(); ) {
+              DatanodeDescriptor node = jt.next();
+              out.print(" " + node + " : " );
             }
             out.println("");
           }
@@ -401,7 +398,7 @@
     
     /* get replication factor of a block */
     private int getReplication( Block block ) {
-        FSDirectory.INode fileINode = dir.getFileByBlock(block);
+        FSDirectory.INode fileINode = blocksMap.getINode( block );
         if( fileINode == null ) { // block does not belong to any file
             return 0;
         } else {
@@ -485,9 +482,10 @@
 
         /* add a block to a under replication queue */
         synchronized boolean add(Block block) {
-            int curReplicas = countContainingNodes(blocksMap.get(block));
             int expectedReplicas = getReplication(block);
-            return add(block, curReplicas, expectedReplicas);
+            return add(block,
+                       countContainingNodes( block ),
+                       expectedReplicas);
         }
         
         /* remove a block from a under replication queue */
@@ -522,7 +520,7 @@
         
         /* remove a block from a under replication queue */
         synchronized boolean remove(Block block) {
-            int curReplicas = countContainingNodes(blocksMap.get(block));
+            int curReplicas = countContainingNodes( block );
             int expectedReplicas = getReplication(block);
             return remove(block, curReplicas, expectedReplicas);
         }
@@ -530,7 +528,7 @@
         /* update the priority level of a block */
         synchronized void update(Block block,
                 int curReplicasDelta, int expectedReplicasDelta) {
-            int curReplicas = countContainingNodes(blocksMap.get(block));
+            int curReplicas = countContainingNodes( block );
             int curExpectedReplicas = getReplication(block);
             int oldReplicas = curReplicas-curReplicasDelta;
             int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta;
@@ -614,20 +612,20 @@
         if (blocks != null) {
             results = new Object[2];
             DatanodeDescriptor machineSets[][] = new DatanodeDescriptor[blocks.length][];
-            DatanodeDescriptor clientNode = getDatanodeByHost(clientMachine);
 
             for (int i = 0; i < blocks.length; i++) {
-                Collection<DatanodeDescriptor> containingNodes = blocksMap.get(blocks[i]);
-                if (containingNodes == null) {
+                int numNodes = blocksMap.numNodes( blocks[i] );
+                if ( numNodes <= 0 ) {
                     machineSets[i] = new DatanodeDescriptor[0];
                 } else {
-                    machineSets[i] = new DatanodeDescriptor[containingNodes.size()];
-                    ArrayList<DatanodeDescriptor> containingNodesList =
-                      new ArrayList<DatanodeDescriptor>(containingNodes.size());
-                    containingNodesList.addAll(containingNodes);
-                    
-                    machineSets[i] = replicator.sortByDistance(
-                        clientNode, containingNodesList);
+                    machineSets[i] = new DatanodeDescriptor[ numNodes ];
+                    numNodes = 0;
+                    for( Iterator<DatanodeDescriptor> it = 
+                         blocksMap.nodeIterator( blocks[i] ); it.hasNext(); ) {
+                        machineSets[i][ numNodes++ ] = it.next();
+                    }
+                    clusterMap.sortByDistance( getDatanodeByHost(clientMachine),
+                                               machineSets[i] );
                 }
             }
 
@@ -998,9 +996,7 @@
         //
         for (int i = 0; i < nrBlocks; i++) {
             Block b = pendingBlocks[i];
-            List<DatanodeDescriptor> containingNodes = blocksMap.get(b);
-            Block storedBlock = 
-                containingNodes.get(0).getBlock(b);
+            Block storedBlock = blocksMap.getStoredBlock( b );
             if ( storedBlock != null ) {
                 pendingBlocks[i] = storedBlock;
             }
@@ -1044,10 +1040,8 @@
         // the blocks.
         int numExpectedReplicas = pendingFile.getReplication();
         for (int i = 0; i < nrBlocks; i++) {
-          Collection<DatanodeDescriptor> containingNodes = blocksMap.get(pendingBlocks[i]);
-          // filter out containingNodes that are marked for decommission.
-          int numCurrentReplica = countContainingNodes(containingNodes);
-
+            // filter out containingNodes that are marked for decommission.
+            int numCurrentReplica = countContainingNodes( pendingBlocks[i] );
             if (numCurrentReplica < numExpectedReplicas) {
                 neededReplications.add(
                       pendingBlocks[i], numCurrentReplica, numExpectedReplicas);
@@ -1065,7 +1059,7 @@
         Block b = null;
         do {
             b = new Block(FSNamesystem.randBlockId.nextLong(), 0);
-        } while (dir.isValidBlock(b));
+        } while ( isValidBlock(b) );
         FileUnderConstruction v = pendingCreates.get(src);
         v.getBlocks().add(b);
         pendingCreateBlocks.add(b);
@@ -1083,9 +1077,7 @@
         FileUnderConstruction v = pendingCreates.get(src);
 
         for (Iterator<Block> it = v.getBlocks().iterator(); it.hasNext(); ) {
-            Block b = it.next();
-            Collection<DatanodeDescriptor> containingNodes = blocksMap.get(b);
-            if (containingNodes == null || containingNodes.size() < this.minReplication) {
+            if ( blocksMap.numNodes(it.next()) < this.minReplication ) {
                 return false;
             }
         }
@@ -1145,24 +1137,20 @@
         throw new SafeModeException("Cannot invalidate block " + blk.getBlockName(), safeMode);
       }
 
-      Collection<DatanodeDescriptor> containingNodes = blocksMap.get(blk);
-
       // Check how many copies we have of the block.  If we have at least one
       // copy on a live node, then we can delete it. 
-      if (containingNodes != null ) {
-        if ((countContainingNodes(containingNodes) > 1) || 
-            ((countContainingNodes(containingNodes) == 1) &&
-             (dn.isDecommissionInProgress() || dn.isDecommissioned()))) {
+      int count = countContainingNodes( blk );
+      if ( (count > 1) || ( (count == 1) && ( dn.isDecommissionInProgress() || 
+                                              dn.isDecommissioned() ))) {
           addToInvalidates(blk, dn);
           removeStoredBlock(blk, getDatanode(dn));
           NameNode.stateChangeLog.info("BLOCK* NameSystem.invalidateBlocks: "
                                         + blk.getBlockName() + " on " 
                                         + dn.getName() + " listed for deletion.");
-        } else {
+      } else {
           NameNode.stateChangeLog.info("BLOCK* NameSystem.invalidateBlocks: "
                                         + blk.getBlockName() + " on " 
                                         + dn.getName() + " is the only copy and was not deleted.");
-        }
       }
     }
 
@@ -1202,15 +1190,14 @@
         if (deletedBlocks != null) {
             for (int i = 0; i < deletedBlocks.length; i++) {
                 Block b = deletedBlocks[i];
-
-                Collection<DatanodeDescriptor> containingNodes = blocksMap.get(b);
-                if (containingNodes != null) {
-                    for (Iterator<DatanodeDescriptor> it = containingNodes.iterator(); it.hasNext(); ) {
-                        DatanodeDescriptor node = it.next();
-                        addToInvalidates(b, node);
-                        NameNode.stateChangeLog.debug("BLOCK* NameSystem.delete: "
-                            + b.getBlockName() + " is added to invalidSet of " + node.getName() );
-                    }
+                
+                for ( Iterator<DatanodeDescriptor> it = 
+                      blocksMap.nodeIterator( b ); it.hasNext(); ) {
+                    DatanodeDescriptor node = it.next();
+                    addToInvalidates(b, node);
+                    NameNode.stateChangeLog.debug("BLOCK* NameSystem.delete: "
+                        + b.getBlockName() + " is added to invalidSet of " 
+                        + node.getName() );
                 }
             }
         }
@@ -1327,12 +1314,10 @@
         } else {
           String hosts[][] = new String[(endBlock - startBlock) + 1][];
             for (int i = startBlock; i <= endBlock; i++) {
-                Collection<DatanodeDescriptor> containingNodes = blocksMap.get(blocks[i]);
                 Collection<String> v = new ArrayList<String>();
-                if (containingNodes != null) {
-                  for (Iterator<DatanodeDescriptor> it =containingNodes.iterator(); it.hasNext();) {
+                for ( Iterator<DatanodeDescriptor> it = 
+                      blocksMap.nodeIterator( blocks[i] ); it.hasNext(); ) {
                     v.add( it.next().getHostName() );
-                  }
                 }
                 hosts[i-startBlock] = v.toArray(new String[v.size()]);
             }
@@ -2169,7 +2154,7 @@
             // they are added to recentInvalidateSets and will be sent out
             // thorugh succeeding heartbeat responses.
             //
-            if (! dir.isValidBlock(b) && ! pendingCreateBlocks.contains(b)) {
+            if (! isValidBlock(b) && ! pendingCreateBlocks.contains(b)) {
               if (obsolete.size() > FSConstants.BLOCK_INVALIDATE_CHUNK) {
                 addToInvalidates(b, node);
               } else {
@@ -2188,28 +2173,23 @@
      * @return the block that is stored in blockMap.
      */
     synchronized Block addStoredBlock(Block block, DatanodeDescriptor node) {
-        List<DatanodeDescriptor> containingNodes = blocksMap.get(block);
-        if (containingNodes == null) {
-            //Create an arraylist with the current replication factor
-            FSDirectory.INode inode = dir.getFileByBlock(block);
-            int replication = (inode != null) ? 
-                              inode.getReplication() : defaultReplication;
-            containingNodes = new ArrayList<DatanodeDescriptor>(replication);
-            blocksMap.put(block, containingNodes);
-        } else {
-            Block storedBlock = 
-                containingNodes.get(0).getBlock(block);
-            // update stored block's length.
-            if ( storedBlock != null ) {
-                if ( block.getNumBytes() > 0 ) {
-                    storedBlock.setNumBytes( block.getNumBytes() );
-                }
-                block = storedBlock;
+        
+        FSDirectory.INode fileINode = blocksMap.getINode( block );
+        int replication = (fileINode != null) ?  fileINode.getReplication() : 
+                                                 defaultReplication;
+        boolean added = blocksMap.addNode( block, node, replication );
+        
+        Block storedBlock = blocksMap.getStoredBlock( block ); //extra look up!
+        if ( storedBlock != null && block != storedBlock ) {
+            if ( block.getNumBytes() > 0 ) {
+               storedBlock.setNumBytes( block.getNumBytes() );
             }
+            block = storedBlock;
         }
+        
         int curReplicaDelta = 0;
-        if (! containingNodes.contains(node)) {
-            containingNodes.add(node);
+        
+        if ( added ) {
             curReplicaDelta = 1;
             // 
             // Hairong: I would prefer to set the level of next logrecord
@@ -2226,12 +2206,11 @@
                     + block.getBlockName() + " on " + node.getName());
         }
 
-        FSDirectory.INode fileINode = dir.getFileByBlock(block);
         if( fileINode == null )  // block does not belong to any file
             return block;
         
         // filter out containingNodes that are marked for decommission.
-        int numCurrentReplica = countContainingNodes(containingNodes)
+        int numCurrentReplica = countContainingNodes( block )
                               + pendingReplications.getNumReplicas(block);
         
         // check whether safe replication is reached for the block
@@ -2255,11 +2234,9 @@
      * mark them in the excessReplicateMap.
      */
     private void proccessOverReplicatedBlock( Block block, short replication ) {
-      Collection<DatanodeDescriptor> containingNodes = blocksMap.get(block);
-      if( containingNodes == null )
-        return;
       Collection<DatanodeDescriptor> nonExcess = new ArrayList<DatanodeDescriptor>();
-      for (Iterator<DatanodeDescriptor> it = containingNodes.iterator(); it.hasNext(); ) {
+      for (Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator( block ); 
+           it.hasNext(); ) {
           DatanodeDescriptor cur = it.next();
           Collection<Block> excessBlocks = excessReplicateMap.get(cur.getStorageID());
           if (excessBlocks == null || ! excessBlocks.contains(block)) {
@@ -2335,27 +2312,20 @@
     synchronized void removeStoredBlock(Block block, DatanodeDescriptor node) {
         NameNode.stateChangeLog.debug("BLOCK* NameSystem.removeStoredBlock: "
                 +block.getBlockName() + " from "+node.getName() );
-        Collection<DatanodeDescriptor> containingNodes = blocksMap.get(block);
-        if (containingNodes == null || ! containingNodes.contains(node)) {
+        if ( !blocksMap.removeNode( block, node ) ) {
           NameNode.stateChangeLog.debug("BLOCK* NameSystem.removeStoredBlock: "
             +block.getBlockName()+" has already been removed from node "+node );
           return;
         }
-        containingNodes.remove(node);
         
-        // filter out containingNodes that are marked for decommission.
-        int numCurrentReplica = countContainingNodes(containingNodes);
-
-        decrementSafeBlockCount( numCurrentReplica );
-        if( containingNodes.isEmpty() )
-          blocksMap.remove(block);
+        decrementSafeBlockCount( block );
         //
         // It's possible that the block was removed because of a datanode
         // failure.  If the block is still valid, check if replication is
         // necessary.  In that case, put block on a possibly-will-
         // be-replicated list.
         //
-        FSDirectory.INode fileINode = dir.getFileByBlock(block);
+        FSDirectory.INode fileINode = blocksMap.getINode( block );
         if( fileINode != null ) {
             neededReplications.update(block, -1, 0);
         }
@@ -2636,28 +2606,27 @@
      * Counts the number of nodes in the given list. Skips over nodes
      * that are marked for decommission.
      */
-    private int countContainingNodes(Collection<DatanodeDescriptor> nodelist) {
-      if( nodelist == null ) return 0;
+    private int countContainingNodes(Iterator<DatanodeDescriptor> nodeIter) {
       int count = 0;
-      for (Iterator<DatanodeDescriptor> it = nodelist.iterator(); 
-           it.hasNext(); ) {
-        DatanodeDescriptor node = it.next();
+      while ( nodeIter.hasNext() ) {
+        DatanodeDescriptor node = nodeIter.next();
         if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
           count++;
         }
       }
       return count;
     }
+    
+    /** wrapper for countContainingNodes( Iterator ). */
+    private int countContainingNodes( Block b ) {
+      return countContainingNodes( blocksMap.nodeIterator( b ) );
+    }
 
-    /*
-     * Filter nodes that are marked for decommison in the given list. 
-     * Return a list of non-decommissioned nodes
-     */
-    private List<DatanodeDescriptor> filterDecommissionedNodes(
-        Collection<DatanodeDescriptor> nodelist) {
-      List<DatanodeDescriptor> nonCommissionedNodeList =
+    /** Reeturns a newly allocated list exluding the decommisioned nodes. */
+    ArrayList<DatanodeDescriptor> containingNodeList( Block b ) {
+      ArrayList<DatanodeDescriptor> nonCommissionedNodeList = 
         new ArrayList<DatanodeDescriptor>();
-      for (Iterator<DatanodeDescriptor> it = nodelist.iterator(); 
+      for( Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator( b );
            it.hasNext(); ) {
         DatanodeDescriptor node = it.next();
         if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
@@ -2674,15 +2643,9 @@
         Block decommissionBlocks[] = srcNode.getBlocks();
         for (int i = 0; i < decommissionBlocks.length; i++) {
             Block block = decommissionBlocks[i];
-            FSDirectory.INode fileINode = dir.getFileByBlock(block);
-            if (fileINode == null) {
-                continue;
-            }
-            Collection<DatanodeDescriptor> containingNodes = blocksMap.get(block); 
-            List<DatanodeDescriptor> nodes =
-                filterDecommissionedNodes(containingNodes);
-            int numCurrentReplica = nodes.size();
-            if (fileINode.getReplication() > numCurrentReplica) {
+            FSDirectory.INode fileINode = blocksMap.getINode( block );
+            if ( fileINode != null &&
+                 fileINode.getReplication() > countContainingNodes(block) ) {
               return true;
             }
         }
@@ -2758,22 +2721,20 @@
           }
           Block block = it.next();
           long blockSize = block.getNumBytes();
-          FSDirectory.INode fileINode = dir.getFileByBlock(block);
+          FSDirectory.INode fileINode = blocksMap.getINode( block );
           if (fileINode == null) { // block does not belong to any file
             it.remove();
           } else {
-            Collection<DatanodeDescriptor> containingNodes = blocksMap.get(block);
+            List<DatanodeDescriptor> containingNodes = 
+                                                    containingNodeList(block);
             Collection<Block> excessBlocks = excessReplicateMap.get( 
                                                       srcNode.getStorageID() );
 
             // srcNode must contain the block, and the block must
             // not be scheduled for removal on that node
-            if (containingNodes != null && containingNodes.contains(srcNode)
+            if (containingNodes.contains(srcNode)
                 && (excessBlocks == null || ! excessBlocks.contains(block))) {
-              // filter out containingNodes that are marked for decommission.
-              List<DatanodeDescriptor> nodes = 
-                  filterDecommissionedNodes(containingNodes);
-              int numCurrentReplica = nodes.size() +
+              int numCurrentReplica = containingNodes.size() + 
                                       pendingReplications.getNumReplicas(block);
               if (numCurrentReplica >= fileINode.getReplication()) {
                 it.remove();
@@ -2782,7 +2743,7 @@
                   Math.min( fileINode.getReplication() - numCurrentReplica,
                             needed),
                   datanodeMap.get(srcNode.getStorageID()),
-                  nodes, null, blockSize);
+                  containingNodes, null, blockSize);
                 if (targets.length > 0) {
                   // Build items to return
                   replicateBlocks.add(block);
@@ -2808,7 +2769,7 @@
             DatanodeDescriptor targets[] = 
                       (DatanodeDescriptor[]) replicateTargetSets.get(i);
             int numCurrentReplica = numCurrentReplicas.get(i).intValue();
-            int numExpectedReplica = dir.getFileByBlock( block).getReplication(); 
+            int numExpectedReplica = blocksMap.getINode(block).getReplication(); 
             if (numCurrentReplica + targets.length >= numExpectedReplica) {
               neededReplications.remove(
                       block, numCurrentReplica, numExpectedReplica);
@@ -3291,29 +3252,8 @@
         }
         return nodes.toArray( results );
       }
-      
-      /** Return datanodes that sorted by their distances to <i>reader</i>
-       */
-      DatanodeDescriptor[] sortByDistance( 
-          final DatanodeDescriptor reader,
-          List<DatanodeDescriptor> nodes ) {
-          synchronized(clusterMap) {
-              if(reader != null && clusterMap.contains(reader)) {
-                  java.util.Collections.sort(nodes, new Comparator<DatanodeDescriptor>() {
-                      public int compare(DatanodeDescriptor n1, DatanodeDescriptor n2) {
-                          return clusterMap.getDistance(reader, n1)
-                          -clusterMap.getDistance(reader, n2);
-                      }
-                  });
-              }
-          }
-          return (DatanodeDescriptor[])nodes.toArray(
-                  new DatanodeDescriptor[nodes.size()]);
-      }
-      
     } //end of Replicator
 
-
     // Keeps track of which datanodes are allowed to connect to the namenode.
         
     private boolean inHostsList(DatanodeID node) {
@@ -3582,7 +3522,7 @@
      * <em>safe blocks</em>, those that have at least the minimal number of
      * replicas, and calculates the ratio of safe blocks to the total number
      * of blocks in the system, which is the size of
-     * {@link FSDirectory#activeBlocks}. When the ratio reaches the
+     * {@link blocksMap}. When the ratio reaches the
      * {@link #threshold} it starts the {@link SafeModeMonitor} daemon in order
      * to monitor whether the safe mode extension is passed. Then it leaves safe
      * mode and destroys itself.
@@ -3654,7 +3594,9 @@
        */
       synchronized boolean isOn() {
         try {
-          isConsistent();   // SHV this is an assert
+          assert isConsistent() : " SafeMode: Inconsistent filesystem state: "
+                 + "Total num of blocks, active blocks, or "
+                 + "total safe blocks don't match.";
         } catch( IOException e ) {
           System.err.print( StringUtils.stringifyException( e ));
         }
@@ -3801,26 +3743,20 @@
       
       /**
        * Checks consistency of the class state.
+       * This is costly and currently called only in assert.
        */
-      void isConsistent() throws IOException {
+      boolean isConsistent() throws IOException {
         if( blockTotal == -1 && blockSafe == -1 ) {
-          return; // manual safe mode
+          return true; // manual safe mode
         }
-        int activeBlocks = dir.activeBlocks.size();
-        if( blockTotal != activeBlocks )
-          throw new IOException( "blockTotal " + blockTotal 
-              + " does not match all blocks count. " 
-              + "activeBlocks = " + activeBlocks 
-              + ". safeBlocks = " + blockSafe 
-              + " safeMode is: " 
-              + ((safeMode == null) ? "null" : safeMode.toString()) ); 
-        if( blockSafe < 0 || blockSafe > blockTotal )
-          throw new IOException( "blockSafe " + blockSafe 
-              + " is out of range [0," + blockTotal + "]. " 
-              + "activeBlocks = " + activeBlocks 
-              + " safeMode is: " 
-              + ((safeMode == null) ? "null" : safeMode.toString()) ); 
-      } 
+        int activeBlocks = blocksMap.map.size();
+        for( Iterator<Collection<Block>> it = 
+             recentInvalidateSets.values().iterator(); it.hasNext(); ) {
+          activeBlocks -= it.next().size();
+        }
+        return ( blockTotal == activeBlocks ) ||
+               ( blockSafe >= 0 && blockSafe <= blockTotal );
+      }
     }
     
     /**
@@ -3880,10 +3816,10 @@
      * Decrement number of blocks that reached minimal replication.
      * @param replication current replication
      */
-    void decrementSafeBlockCount( int replication ) {
-      if( safeMode == null )
+    void decrementSafeBlockCount( Block b ) {
+      if( safeMode == null ) // mostly true
         return;
-      safeMode.decrementSafeBlockCount( (short)replication );
+      safeMode.decrementSafeBlockCount( (short)countContainingNodes( b ) );
     }
 
     /**
@@ -3892,7 +3828,7 @@
     void setBlockTotal() {
       if( safeMode == null )
         return;
-      safeMode.setBlockTotal( dir.activeBlocks.size() );
+      safeMode.setBlockTotal( blocksMap.map.size() );
     }
 
     /**
@@ -4013,6 +3949,177 @@
           response.sendError(HttpServletResponse.SC_GONE, errMsg);
           throw ie;
         }
+      }
+    }
+    
+    /**
+     * Returns whether the given block is one pointed-to by a file.
+     */
+    public boolean isValidBlock(Block b) {
+      return blocksMap.getINode( b ) != null;
+    }
+    
+    /**
+     * This class maintains the map from a block to its metadata.
+     * block's metadata currently includes INode it belongs to and
+     * the datanodes that store the block.
+     */
+    class BlocksMap {
+        
+      /**
+       * Internal class for block metadata
+       */
+      private class BlockInfo {
+        FSDirectory.INode              inode;
+        
+        /** nodes could contain some null entries at the end, so 
+         *  nodes.legth >= number of datanodes. 
+         *  if nodes != null then nodes[0] != null.
+         */
+        DatanodeDescriptor             nodes[];
+        Block                          block; //block that was inserted.   
+      }
+      
+      private class NodeIterator implements Iterator<DatanodeDescriptor> {
+        NodeIterator( DatanodeDescriptor[] nodes ) {
+          arr = nodes;
+        }
+        DatanodeDescriptor[] arr;
+        int nextIdx = 0;
+        
+        public boolean hasNext() {
+          return arr != null && nextIdx < arr.length && arr[nextIdx] != null;
+        }
+        
+        public DatanodeDescriptor next() {
+          return arr[nextIdx++];
+        }
+        
+        public void remove()  {
+          throw new UnsupportedOperationException( "Sorry. can't remove." );
+        }
+      }
+      
+      Map<Block, BlockInfo> map = new HashMap<Block, BlockInfo>();
+      
+      /** add BlockInfo if mapping does not exist */
+      private BlockInfo checkBlockInfo( Block b ) {
+        BlockInfo info = map.get( b );
+        if ( info == null ) {
+          info = new BlockInfo();
+          info.block = b;
+          map.put( b, info );
+        }
+        return info;
+      }
+      
+      public FSDirectory.INode getINode( Block b ) {
+        BlockInfo info = map.get( b );
+        return ( info != null ) ? info.inode : null;
+      }
+            
+      public void addINode( Block b, FSDirectory.INode iNode ) {
+        BlockInfo info = checkBlockInfo( b );
+        info.inode = iNode;
+      }
+      
+      public void removeINode( Block b ) {
+        BlockInfo info = map.get( b );
+        if ( info != null ) {
+          info.inode = null;
+          if ( info.nodes == null ) {
+            map.remove( b );
+          }
+        }
+      }
+      
+      /** Returns the block object it it exists in the map */
+      public Block getStoredBlock( Block b ) {
+        BlockInfo info = map.get( b );
+        return ( info != null ) ? info.block : null;
+      }
+      
+      /** Returned Iterator does not support */
+      public Iterator<DatanodeDescriptor> nodeIterator( Block b ) {
+        BlockInfo info = map.get( b );
+        return new NodeIterator( ( info != null ) ? info.nodes : null );
+      }
+      
+      /** counts number of containing nodes. Better than using iterator. */
+      public int numNodes( Block b ) {
+        int count = 0;
+        BlockInfo info = map.get( b );
+        if ( info != null && info.nodes != null ) {
+          count = info.nodes.length;
+          while ( info.nodes[ count-1 ] == null ) // mostly false
+            count--;
+        }
+        return count;
+      }
+      
+      /** returns true if the node does not already exists and is added.
+       * false if the node already exists.*/
+      public boolean addNode( Block b, 
+                              DatanodeDescriptor node,
+                              int replicationHint ) {
+        BlockInfo info = checkBlockInfo( b );
+        if ( info.nodes == null ) {
+          info.nodes = new DatanodeDescriptor[ replicationHint ];
+        }
+        
+        DatanodeDescriptor[] arr = info.nodes;
+        for( int i=0; i < arr.length; i++ ) {
+          if ( arr[i] == null ) {
+            arr[i] = node;
+            return true;
+          }
+          if ( arr[i] == node ) {
+            return false;
+          }
+        }
+
+        /* Not enough space left. Create a new array. Should normally 
+         * happen only when replication is manually increased by the user. */
+        info.nodes = new DatanodeDescriptor[ arr.length + 1 ];
+        for( int i=0; i < arr.length; i++ ) {
+          info.nodes[i] = arr[i];
+        }
+        info.nodes[ arr.length ] = node;
+        return true;
+      }
+      
+      public boolean removeNode( Block b, DatanodeDescriptor node ) {
+        BlockInfo info = map.get( b );
+        if ( info == null || info.nodes == null )
+          return false;
+
+        boolean removed = false;
+        // swap lastNode and node's location. set lastNode to null.
+        DatanodeDescriptor[] arr = info.nodes;
+        int lastNode = -1;
+        for( int i=arr.length-1; i >= 0; i-- ) {
+          if ( lastNode < 0 && arr[i] != null )
+            lastNode = i;
+          if ( arr[i] == node ) {
+            arr[i] = arr[ lastNode ];
+            arr[ lastNode ] = null;
+            removed = true;
+            break;
+          }
+        }
+          
+        /*
+         * if ( (lastNode + 1) < arr.length/4 ) {
+         *    we could trim the array.
+         * } 
+         */
+        if ( arr[0] == null ) { // no datanodes left.
+          info.nodes = null;
+          if ( info.inode == null ) {
+            map.remove( b );
+          }
+        }
+        return removed;
       }
     }
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/net/NetworkTopology.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/net/NetworkTopology.java?view=diff&rev=526271&r1=526270&r2=526271
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/net/NetworkTopology.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/net/NetworkTopology.java Fri Apr  6 13:29:23 2007
@@ -19,8 +19,10 @@
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Comparator;
 import java.util.List;
 import java.util.Random;
+import java.util.Arrays;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -531,5 +533,26 @@
             tree.append( "\n");
         }
         return tree.toString();
+    }
+
+    /* Set and used only inside sortByDistance. 
+     * This saves an allocation each time we sort.
+     */
+    private DatanodeDescriptor distFrom = null;
+    private final Comparator<DatanodeDescriptor> nodeDistanceComparator = 
+      new Comparator<DatanodeDescriptor>() {
+        public int compare(DatanodeDescriptor n1, DatanodeDescriptor n2) {
+          return getDistance(distFrom, n1) - getDistance(distFrom, n2);
+        }
+    };
+      
+    /** Sorts nodes array by their distances to <i>reader</i>. */
+    public synchronized void sortByDistance( final DatanodeDescriptor reader,
+                                             DatanodeDescriptor[] nodes ) { 
+      if(reader != null && contains(reader)) {
+        distFrom = reader;
+        Arrays.sort( nodes, nodeDistanceComparator );
+        distFrom = null;
+      }
     }
 }