You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2007/01/16 21:11:43 UTC

svn commit: r496845 - in /lucene/hadoop/trunk: CHANGES.txt src/java/org/apache/hadoop/dfs/Block.java src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java src/java/org/apache/hadoop/dfs/FSNamesystem.java

Author: cutting
Date: Tue Jan 16 12:11:42 2007
New Revision: 496845

URL: http://svn.apache.org/viewvc?view=rev&rev=496845
Log:
HADOOP-803.  Reduce memory footprint of HDFS namenode by replacing the TreeSet of block locations with an ArrayList.  Contributed by Raghu.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Block.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=496845&r1=496844&r2=496845
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Tue Jan 16 12:11:42 2007
@@ -17,6 +17,10 @@
  4. HADOOP-757.  Fix "Bad File Descriptor" exception in HDFS client
     when an output file is closed twice.  (Raghu Angadi via cutting)
 
+ 5. HADOOP-803.  Reduce memory footprint of HDFS namenode by replacing
+    the TreeSet of block locations with an ArrayList.
+    (Raghu Angadi via cutting)
+
 
 Release 0.10.1 - 2007-01-10
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Block.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Block.java?view=diff&rev=496845&r1=496844&r2=496845
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Block.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Block.java Tue Jan 16 12:11:42 2007
@@ -121,18 +121,11 @@
     // Comparable
     /////////////////////////////////////
     public int compareTo(Object o) {
-        Block b = (Block) o;
-        if (getBlockId() < b.getBlockId()) {
-            return -1;
-        } else if (getBlockId() == b.getBlockId()) {
-            return 0;
-        } else {
-            return 1;
-        }
+        long diff = getBlockId() - ((Block)o).getBlockId();
+        return ( diff < 0 ) ? -1 : ( ( diff > 0 ) ? 1 : 0 );
     }
     public boolean equals(Object o) {
-        Block b = (Block) o;
-        return (this.compareTo(b) == 0);
+        return (this.compareTo(o) == 0);
     }
     
     public int hashCode() {

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java?view=diff&rev=496845&r1=496844&r2=496845
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java Tue Jan 16 12:11:42 2007
@@ -34,7 +34,7 @@
  **************************************************/
 class DatanodeDescriptor extends DatanodeInfo {
 
-  private volatile Collection<Block> blocks = new TreeSet<Block>();
+  private volatile SortedMap<Block, Block> blocks = new TreeMap<Block, Block>();
   // isAlive == heartbeats.contains(this)
   // This is an optimization, because contains takes O(n) time on Arraylist
   protected boolean isAlive = false;
@@ -60,17 +60,12 @@
 
   /**
    */
-  void updateBlocks(Block newBlocks[]) {
-    blocks.clear();
-    for (int i = 0; i < newBlocks.length; i++) {
-      blocks.add(newBlocks[i]);
-    }
-  }
-
-  /**
-   */
   void addBlock(Block b) {
-    blocks.add(b);
+      blocks.put(b, b);
+  }
+  
+  void removeBlock(Block b) {
+      blocks.remove(b);
   }
 
   void resetBlocks() {
@@ -94,10 +89,14 @@
   }
   
   Block[] getBlocks() {
-    return (Block[]) blocks.toArray(new Block[blocks.size()]);
+    return blocks.keySet().toArray(new Block[blocks.size()]);
   }
 
   Iterator<Block> getBlockIterator() {
-    return blocks.iterator();
+    return blocks.keySet().iterator();
+  }
+  
+  Block getBlock(long blockId) {
+      return blocks.get( new Block(blockId, 0) );
   }
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?view=diff&rev=496845&r1=496844&r2=496845
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Tue Jan 16 12:11:42 2007
@@ -59,8 +59,8 @@
     // to client-sent information.
     // Mapping: Block -> TreeSet<DatanodeDescriptor>
     //
-    Map<Block, SortedSet<DatanodeDescriptor>> blocksMap = 
-                              new HashMap<Block, SortedSet<DatanodeDescriptor>>();
+    Map<Block, List<DatanodeDescriptor>> blocksMap = 
+                              new HashMap<Block, List<DatanodeDescriptor>>();
 
     /**
      * Stores the datanode -> block map.  
@@ -179,6 +179,8 @@
     private int maxReplicationStreams;
     // MIN_REPLICATION is how many copies we need in place or else we disallow the write
     private int minReplication;
+    // Default replication
+    private int defaultReplication;
     // heartbeatRecheckInterval is how often namenode checks for expired datanodes
     private long heartbeatRecheckInterval;
     // heartbeatExpireInterval is how long namenode waits for datanode to report
@@ -199,6 +201,7 @@
                         int port,
                         NameNode nn, Configuration conf) throws IOException {
         fsNamesystemObject = this;
+        this.defaultReplication = conf.getInt("dfs.replication", 3);
         this.maxReplication = conf.getInt("dfs.replication.max", 512);
         this.minReplication = conf.getInt("dfs.replication.min", 1);
         if( minReplication <= 0 )
@@ -299,7 +302,7 @@
             DatanodeDescriptor machineSets[][] = new DatanodeDescriptor[blocks.length][];
 
             for (int i = 0; i < blocks.length; i++) {
-              SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(blocks[i]);
+                List<DatanodeDescriptor> containingNodes = blocksMap.get(blocks[i]);
                 if (containingNodes == null) {
                     machineSets[i] = new DatanodeDescriptor[0];
                 } else {
@@ -660,22 +663,16 @@
         //
         // We have the pending blocks, but they won't have
         // length info in them (as they were allocated before
-        // data-write took place).  So we need to add the correct
-        // length info to each
-        //
-        // REMIND - mjc - this is very inefficient!  We should
-        // improve this!
+        // data-write took place). Find the block stored in
+        // node descriptor.
         //
         for (int i = 0; i < nrBlocks; i++) {
             Block b = pendingBlocks[i];
-            SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(b);
-            DatanodeDescriptor node = containingNodes.first();
-            for (Iterator<Block> it = node.getBlockIterator(); it.hasNext(); ) {
-                Block cur = it.next();
-                if (b.getBlockId() == cur.getBlockId()) {
-                    b.setNumBytes(cur.getNumBytes());
-                    break;
-                }
+            List<DatanodeDescriptor> containingNodes = blocksMap.get(b);
+            Block storedBlock = 
+                containingNodes.get(0).getBlock(b.getBlockId());
+            if ( storedBlock != null ) {
+                pendingBlocks[i] = storedBlock;
             }
         }
         
@@ -716,7 +713,7 @@
         // Now that the file is real, we need to be sure to replicate
         // the blocks.
         for (int i = 0; i < nrBlocks; i++) {
-          SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(pendingBlocks[i]);
+          List<DatanodeDescriptor> containingNodes = blocksMap.get(pendingBlocks[i]);
           // filter out containingNodes that are marked for decommission.
           int numCurrentReplica = countContainingNodes(containingNodes);
 
@@ -761,7 +758,7 @@
 
         for (Iterator<Block> it = v.getBlocks().iterator(); it.hasNext(); ) {
             Block b = it.next();
-            SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(b);
+            List<DatanodeDescriptor> containingNodes = blocksMap.get(b);
             if (containingNodes == null || containingNodes.size() < this.minReplication) {
                 return false;
             }
@@ -806,7 +803,7 @@
             for (int i = 0; i < deletedBlocks.length; i++) {
                 Block b = deletedBlocks[i];
 
-                SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(b);
+                List<DatanodeDescriptor> containingNodes = blocksMap.get(b);
                 if (containingNodes != null) {
                     for (Iterator<DatanodeDescriptor> it = containingNodes.iterator(); it.hasNext(); ) {
                         DatanodeDescriptor node = it.next();
@@ -935,7 +932,7 @@
         } else {
           String hosts[][] = new String[(endBlock - startBlock) + 1][];
             for (int i = startBlock; i <= endBlock; i++) {
-              SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(blocks[i]);
+                List<DatanodeDescriptor> containingNodes = blocksMap.get(blocks[i]);
                 Collection<String> v = new ArrayList<String>();
                 if (containingNodes != null) {
                   for (Iterator<DatanodeDescriptor> it =containingNodes.iterator(); it.hasNext();) {
@@ -1494,12 +1491,16 @@
         // between the old and new block report.
         //
         int newPos = 0;
-        boolean modified = false;
         Iterator<Block> iter = node.getBlockIterator();
         Block oldblk = iter.hasNext() ? iter.next() : null;
         Block newblk = (newReport != null && newReport.length > 0) ? 
                         newReport[0]	: null;
 
+        // common case is that most of the blocks from the datanode
+        // matches blocks in datanode descriptor.                
+        Collection<Block> toRemove = new LinkedList<Block>();
+        Collection<Block> toAdd = new LinkedList<Block>();
+        
         while (oldblk != null || newblk != null) {
            
             int cmp = (oldblk == null) ? 1 : 
@@ -1513,25 +1514,25 @@
                          ? newReport[newPos] : null;
             } else if (cmp < 0) {
                 // The old report has a block the new one does not
+                toRemove.add(oldblk);
                 removeStoredBlock(oldblk, node);
-                modified = true;
                 oldblk = iter.hasNext() ? iter.next() : null;
             } else {
                 // The new report has a block the old one does not
-                addStoredBlock(newblk, node);
-                modified = true;
+                toAdd.add(addStoredBlock(newblk, node));
                 newPos++;
                 newblk = (newPos < newReport.length)
                          ? newReport[newPos] : null;
             }
         }
-        //
-        // Modify node so it has the new blockreport
-        //
-        if (modified) {
-            node.updateBlocks(newReport);
+        
+        for ( Iterator<Block> i = toRemove.iterator(); i.hasNext(); ) {
+            node.removeBlock( i.next() );
         }
-
+        for ( Iterator<Block> i = toAdd.iterator(); i.hasNext(); ) {
+            node.addBlock( i.next() );
+        }
+        
         //
         // We've now completely updated the node's block report profile.
         // We now go through all its blocks and find which ones are invalid,
@@ -1560,12 +1561,25 @@
     /**
      * Modify (block-->datanode) map.  Remove block from set of 
      * needed replications if this takes care of the problem.
+     * @return the block that is stored in blockMap.
      */
-    synchronized void addStoredBlock(Block block, DatanodeDescriptor node) {
-      SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(block);
+    synchronized Block addStoredBlock(Block block, DatanodeDescriptor node) {
+        List<DatanodeDescriptor> containingNodes = blocksMap.get(block);
         if (containingNodes == null) {
-            containingNodes = new TreeSet<DatanodeDescriptor>();
+            //Create an arraylist with the current replication factor
+            FSDirectory.INode inode = dir.getFileByBlock(block);
+            int replication = (inode != null) ? 
+                              inode.getReplication() : defaultReplication;
+            containingNodes = new ArrayList<DatanodeDescriptor>(replication);
             blocksMap.put(block, containingNodes);
+        } else {
+            Block storedBlock = 
+                containingNodes.get(0).getBlock(block.getBlockId());
+            // update stored block's length.
+            if ( block.getNumBytes() > 0 ) {
+                storedBlock.setNumBytes( block.getNumBytes() );
+            }
+            block = storedBlock;
         }
         if (! containingNodes.contains(node)) {
             containingNodes.add(node);
@@ -1587,7 +1601,7 @@
         synchronized (neededReplications) {
             FSDirectory.INode fileINode = dir.getFileByBlock(block);
             if( fileINode == null )  // block does not belong to any file
-                return;
+                return block;
 
             // filter out containingNodes that are marked for decommission.
             int numCurrentReplica = countContainingNodes(containingNodes);
@@ -1612,6 +1626,7 @@
 
             proccessOverReplicatedBlock( block, fileReplication );
         }
+        return block;
     }
     
     /**
@@ -1620,7 +1635,7 @@
      * mark them in the excessReplicateMap.
      */
     private void proccessOverReplicatedBlock( Block block, short replication ) {
-      SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(block);
+      List<DatanodeDescriptor> containingNodes = blocksMap.get(block);
       if( containingNodes == null )
         return;
       Collection<DatanodeDescriptor> nonExcess = new ArrayList<DatanodeDescriptor>();
@@ -1700,7 +1715,7 @@
     synchronized void removeStoredBlock(Block block, DatanodeDescriptor node) {
         NameNode.stateChangeLog.debug("BLOCK* NameSystem.removeStoredBlock: "
                 +block.getBlockName() + " from "+node.getName() );
-        SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(block);
+        List<DatanodeDescriptor> containingNodes = blocksMap.get(block);
         if (containingNodes == null || ! containingNodes.contains(node)) {
           NameNode.stateChangeLog.debug("BLOCK* NameSystem.removeStoredBlock: "
             +block.getBlockName()+" has already been removed from node "+node );
@@ -1759,14 +1774,9 @@
         NameNode.stateChangeLog.debug("BLOCK* NameSystem.blockReceived: "
                 +block.getBlockName()+" is received from " + nodeID.getName() );
         //
-        // Modify the blocks->datanode map
+        // Modify the blocks->datanode map and node's map.
         // 
-        addStoredBlock(block, node);
-
-        //
-        // Supplement node's blockreport
-        //
-        node.addBlock(block);
+        node.addBlock( addStoredBlock(block, node) );
     }
 
     /**