You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2011/07/27 07:46:55 UTC

svn commit: r1151339 - in /hadoop/common/trunk/hdfs: ./ src/java/org/apache/hadoop/hdfs/server/blockmanagement/ src/java/org/apache/hadoop/hdfs/server/namenode/ src/test/hdfs/org/apache/hadoop/hdfs/ src/test/hdfs/org/apache/hadoop/hdfs/server/blockmana...

Author: szetszwo
Date: Wed Jul 27 05:46:52 2011
New Revision: 1151339

URL: http://svn.apache.org/viewvc?rev=1151339&view=rev
Log:
HDFS-2191.  Move datanodeMap from FSNamesystem to DatanodeManager.

Added:
    hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestComputeInvalidateWork.java
      - copied, changed from r1151310, hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestComputeInvalidateWork.java
Removed:
    hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestComputeInvalidateWork.java
Modified:
    hadoop/common/trunk/hdfs/CHANGES.txt
    hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
    hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
    hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
    hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestDecommission.java
    hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
    hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java
    hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java
    hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
    hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java

Modified: hadoop/common/trunk/hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/CHANGES.txt?rev=1151339&r1=1151338&r2=1151339&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hdfs/CHANGES.txt Wed Jul 27 05:46:52 2011
@@ -604,6 +604,9 @@ Trunk (unreleased changes)
     HDFS-2149. Move EditLogOp serialization formats into FsEditLogOp
     implementations. (Ivan Kelly via todd)
 
+    HDFS-2191.  Move datanodeMap from FSNamesystem to DatanodeManager.
+    (szetszwo)
+
   OPTIMIZATIONS
 
     HDFS-1458. Improve checkpoint performance by avoiding unnecessary image

Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1151339&r1=1151338&r2=1151339&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Wed Jul 27 05:46:52 2011
@@ -42,9 +42,11 @@ import org.apache.hadoop.hdfs.DFSConfigK
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportIterator;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportIterator;
+import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
 import org.apache.hadoop.hdfs.server.blockmanagement.UnderReplicatedBlocks.BlockIterator;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
@@ -53,6 +55,8 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.INodeFile;
 import org.apache.hadoop.hdfs.server.namenode.INodeFileUnderConstruction;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
+import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.util.Daemon;
 
@@ -156,7 +160,7 @@ public class BlockManager {
   public final int defaultReplication;
   /** The maximum number of entries returned by getCorruptInodes() */
   final int maxCorruptFilesReturned;
-  
+
   /** variable to enable check for enough racks */
   final boolean shouldCheckForEnoughRacks;
 
@@ -294,15 +298,14 @@ public class BlockManager {
       }
     }
 
-    //
     // Dump blocks from pendingReplication
-    //
     pendingReplications.metaSave(out);
 
-    //
     // Dump blocks that are waiting to be deleted
-    //
     dumpRecentInvalidateSets(out);
+
+    // Dump all datanodes
+    getDatanodeManager().datanodeDump(out);
   }
 
   /**
@@ -453,7 +456,7 @@ public class BlockManager {
   /**
    * Get all valid locations of the block
    */
-  public ArrayList<String> getValidLocations(Block block) {
+  private List<String> getValidLocations(Block block) {
     ArrayList<String> machineSet =
       new ArrayList<String>(blocksMap.numNodes(block));
     for(Iterator<DatanodeDescriptor> it =
@@ -562,6 +565,49 @@ public class BlockManager {
                             minReplication);
   }
 
+   /** Get all blocks with location information from a datanode. */
+  public BlocksWithLocations getBlocksWithLocations(final DatanodeID datanode,
+      final long size) throws UnregisteredNodeException {
+    final DatanodeDescriptor node = getDatanodeManager().getDatanode(datanode);
+    if (node == null) {
+      NameNode.stateChangeLog.warn("BLOCK* NameSystem.getBlocks: "
+          + "Asking for blocks from an unrecorded node " + datanode.getName());
+      throw new HadoopIllegalArgumentException(
+          "Datanode " + datanode.getName() + " not found.");
+    }
+
+    int numBlocks = node.numBlocks();
+    if(numBlocks == 0) {
+      return new BlocksWithLocations(new BlockWithLocations[0]);
+    }
+    Iterator<BlockInfo> iter = node.getBlockIterator();
+    int startBlock = DFSUtil.getRandom().nextInt(numBlocks); // starting from a random block
+    // skip blocks
+    for(int i=0; i<startBlock; i++) {
+      iter.next();
+    }
+    List<BlockWithLocations> results = new ArrayList<BlockWithLocations>();
+    long totalSize = 0;
+    BlockInfo curBlock;
+    while(totalSize<size && iter.hasNext()) {
+      curBlock = iter.next();
+      if(!curBlock.isComplete())  continue;
+      totalSize += addBlock(curBlock, results);
+    }
+    if(totalSize<size) {
+      iter = node.getBlockIterator(); // start from the beginning
+      for(int i=0; i<startBlock&&totalSize<size; i++) {
+        curBlock = iter.next();
+        if(!curBlock.isComplete())  continue;
+        totalSize += addBlock(curBlock, results);
+      }
+    }
+
+    return new BlocksWithLocations(
+        results.toArray(new BlockWithLocations[results.size()]));
+  }
+
+   
   /** Remove a datanode. */
   public void removeDatanode(final DatanodeDescriptor node) {
     final Iterator<? extends Block> it = node.getBlockIterator();
@@ -660,7 +706,7 @@ public class BlockManager {
     for(Map.Entry<String,Collection<Block>> entry : recentInvalidateSets.entrySet()) {
       Collection<Block> blocks = entry.getValue();
       if (blocks.size() > 0) {
-        out.println(namesystem.getDatanode(entry.getKey()).getName() + blocks);
+        out.println(datanodeManager.getDatanode(entry.getKey()).getName() + blocks);
       }
     }
   }
@@ -684,7 +730,7 @@ public class BlockManager {
   private void markBlockAsCorrupt(BlockInfo storedBlock,
                                   DatanodeInfo dn) throws IOException {
     assert storedBlock != null : "storedBlock should not be null";
-    DatanodeDescriptor node = namesystem.getDatanode(dn);
+    DatanodeDescriptor node = getDatanodeManager().getDatanode(dn);
     if (node == null) {
       throw new IOException("Cannot mark block " + 
                             storedBlock.getBlockName() +
@@ -723,7 +769,7 @@ public class BlockManager {
       throws IOException {
     NameNode.stateChangeLog.info("DIR* NameSystem.invalidateBlock: "
                                  + blk + " on " + dn.getName());
-    DatanodeDescriptor node = namesystem.getDatanode(dn);
+    DatanodeDescriptor node = getDatanodeManager().getDatanode(dn);
     if (node == null) {
       throw new IOException("Cannot invalidate block " + blk +
                             " because datanode " + dn.getName() +
@@ -748,7 +794,7 @@ public class BlockManager {
     }
   }
 
-  public void updateState() {
+  void updateState() {
     pendingReplicationBlocksCount = pendingReplications.size();
     underReplicatedBlocksCount = neededReplications.size();
     corruptReplicaBlocksCount = corruptReplicas.size();
@@ -1134,7 +1180,7 @@ public class BlockManager {
    * If there were any replication requests that timed out, reap them
    * and put them back into the neededReplication queue
    */
-  public void processPendingReplications() {
+  private void processPendingReplications() {
     Block[] timedOutItems = pendingReplications.getTimedOutBlocks();
     if (timedOutItems != null) {
       namesystem.writeLock();
@@ -1700,6 +1746,7 @@ public class BlockManager {
         addedNode, delNodeHint, blockplacement);
   }
 
+
   public void addToExcessReplicate(DatanodeInfo dn, Block block) {
     assert namesystem.hasWriteLock();
     Collection<Block> excessBlocks = excessReplicateMap.get(dn.getStorageID());
@@ -1774,6 +1821,21 @@ public class BlockManager {
   }
 
   /**
+   * Get all valid locations of the block & add the block to results
+   * return the length of the added block; 0 if the block is not added
+   */
+  private long addBlock(Block block, List<BlockWithLocations> results) {
+    final List<String> machineSet = getValidLocations(block);
+    if(machineSet.size() == 0) {
+      return 0;
+    } else {
+      results.add(new BlockWithLocations(block, 
+          machineSet.toArray(new String[machineSet.size()])));
+      return block.getNumBytes();
+    }
+  }
+
+  /**
    * The given node is reporting that it received a certain block.
    */
   public void addBlock(DatanodeDescriptor node, Block block, String delHint)
@@ -1784,7 +1846,7 @@ public class BlockManager {
     // get the deletion hint node
     DatanodeDescriptor delHintNode = null;
     if (delHint != null && delHint.length() != 0) {
-      delHintNode = namesystem.getDatanode(delHint);
+      delHintNode = datanodeManager.getDatanode(delHint);
       if (delHintNode == null) {
         NameNode.stateChangeLog.warn("BLOCK* NameSystem.blockReceived: "
             + block + " is expected to be removed from an unrecorded node "
@@ -2071,7 +2133,7 @@ public class BlockManager {
         return 0;
       // get blocks to invalidate for the nodeId
       assert nodeId != null;
-      DatanodeDescriptor dn = namesystem.getDatanode(nodeId);
+      final DatanodeDescriptor dn = datanodeManager.getDatanode(nodeId);
       if (dn == null) {
         removeFromInvalidates(nodeId);
         return 0;
@@ -2082,11 +2144,11 @@ public class BlockManager {
         return 0;
 
       ArrayList<Block> blocksToInvalidate = new ArrayList<Block>(
-          namesystem.blockInvalidateLimit);
+          getDatanodeManager().blockInvalidateLimit);
 
       // # blocks that can be sent in one message is limited
       Iterator<Block> it = invalidateSet.iterator();
-      for (int blkCount = 0; blkCount < namesystem.blockInvalidateLimit
+      for (int blkCount = 0; blkCount < getDatanodeManager().blockInvalidateLimit
           && it.hasNext(); blkCount++) {
         blocksToInvalidate.add(it.next());
         it.remove();

Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java?rev=1151339&r1=1151338&r2=1151339&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java Wed Jul 27 05:46:52 2011
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
 import java.io.IOException;
+import java.io.PrintWriter;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
@@ -25,7 +26,9 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.NavigableMap;
 import java.util.Set;
+import java.util.TreeMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -35,12 +38,22 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair;
+import org.apache.hadoop.hdfs.server.common.Util;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
 import org.apache.hadoop.ipc.Server;
@@ -48,6 +61,7 @@ import org.apache.hadoop.net.CachedDNSTo
 import org.apache.hadoop.net.DNSToSwitchMapping;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.ScriptBasedMapping;
+import org.apache.hadoop.util.CyclicIteration;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.HostsFileReader;
 import org.apache.hadoop.util.ReflectionUtils;
@@ -62,6 +76,30 @@ public class DatanodeManager {
 
   final FSNamesystem namesystem;
 
+  /**
+   * Stores the datanode -> block map.  
+   * <p>
+   * Done by storing a set of {@link DatanodeDescriptor} objects, sorted by 
+   * storage id. In order to keep the storage map consistent it tracks 
+   * all storages ever registered with the namenode.
+   * A descriptor corresponding to a specific storage id can be
+   * <ul> 
+   * <li>added to the map if it is a new storage id;</li>
+   * <li>updated with a new datanode started as a replacement for the old one 
+   * with the same storage id; and </li>
+   * <li>removed if and only if an existing datanode is restarted to serve a
+   * different storage id.</li>
+   * </ul> <br>
+   * The list of the {@link DatanodeDescriptor}s in the map is checkpointed
+   * in the namespace image file. Only the {@link DatanodeInfo} part is 
+   * persistent, the list of blocks is restored from the datanode block
+   * reports. 
+   * <p>
+   * Mapping: StorageID -> DatanodeDescriptor
+   */
+  private final NavigableMap<String, DatanodeDescriptor> datanodeMap
+      = new TreeMap<String, DatanodeDescriptor>();
+
   /** Cluster network topology */
   private final NetworkTopology networktopology = new NetworkTopology();
 
@@ -71,7 +109,12 @@ public class DatanodeManager {
   private final DNSToSwitchMapping dnsToSwitchMapping;
 
   /** Read include/exclude files*/
-  private final HostsFileReader hostsReader; 
+  private final HostsFileReader hostsReader;
+
+  /** The period to wait for datanode heartbeat.*/
+  private final long heartbeatExpireInterval;
+  /** Ask Datanode only up to this many blocks to delete. */
+  final int blockInvalidateLimit;
   
   DatanodeManager(final FSNamesystem namesystem, final Configuration conf
       ) throws IOException {
@@ -90,6 +133,19 @@ public class DatanodeManager {
     if (dnsToSwitchMapping instanceof CachedDNSToSwitchMapping) {
       dnsToSwitchMapping.resolve(new ArrayList<String>(hostsReader.getHosts()));
     }
+    
+    final long heartbeatIntervalSeconds = conf.getLong(
+        DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
+        DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT);
+    final int heartbeatRecheckInterval = conf.getInt(
+        DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 
+        DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5 minutes
+    this.heartbeatExpireInterval = 2 * heartbeatRecheckInterval
+        + 10 * 1000 * heartbeatIntervalSeconds;
+    this.blockInvalidateLimit = Math.max(20*(int)(heartbeatIntervalSeconds),
+        DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_DEFAULT);
+    LOG.info(DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_KEY
+        + "=" + this.blockInvalidateLimit);
   }
 
   private Daemon decommissionthread = null;
@@ -124,20 +180,88 @@ public class DatanodeManager {
       Arrays.sort(b.getLocations(), DFSUtil.DECOM_COMPARATOR);
     }    
   }
-  
+
+  CyclicIteration<String, DatanodeDescriptor> getDatanodeCyclicIteration(
+      final String firstkey) {
+    return new CyclicIteration<String, DatanodeDescriptor>(
+        datanodeMap, firstkey);
+  }
+
   /** @return the datanode descriptor for the host. */
   public DatanodeDescriptor getDatanodeByHost(final String host) {
     return host2DatanodeMap.getDatanodeByHost(host);
   }
 
+  /** Get a datanode descriptor given corresponding storageID */
+  DatanodeDescriptor getDatanode(final String storageID) {
+    return datanodeMap.get(storageID);
+  }
+
+  /**
+   * Get data node by storage ID.
+   * 
+   * @param nodeID
+   * @return DatanodeDescriptor or null if the node is not found.
+   * @throws UnregisteredNodeException
+   */
+  public DatanodeDescriptor getDatanode(DatanodeID nodeID
+      ) throws UnregisteredNodeException {
+    final DatanodeDescriptor node = getDatanode(nodeID.getStorageID());
+    if (node == null) 
+      return null;
+    if (!node.getName().equals(nodeID.getName())) {
+      final UnregisteredNodeException e = new UnregisteredNodeException(
+          nodeID, node);
+      NameNode.stateChangeLog.fatal("BLOCK* NameSystem.getDatanode: "
+                                    + e.getLocalizedMessage());
+      throw e;
+    }
+    return node;
+  }
+
+  /** Prints information about all datanodes. */
+  void datanodeDump(final PrintWriter out) {
+    synchronized (datanodeMap) {
+      out.println("Metasave: Number of datanodes: " + datanodeMap.size());
+      for(Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator(); it.hasNext();) {
+        DatanodeDescriptor node = it.next();
+        out.println(node.dumpDatanode());
+      }
+    }
+  }
+
+  /** Remove a dead datanode. */
+  public void removeDeadDatanode(final DatanodeID nodeID) {
+    synchronized(namesystem.heartbeats) {
+      synchronized(datanodeMap) {
+        DatanodeDescriptor d;
+        try {
+          d = getDatanode(nodeID);
+        } catch(IOException e) {
+          d = null;
+        }
+        if (d != null && isDatanodeDead(d)) {
+          NameNode.stateChangeLog.info(
+              "BLOCK* removeDeadDatanode: lost heartbeat from " + d.getName());
+          namesystem.removeDatanode(d);
+        }
+      }
+    }
+  }
+
+  /** Is the datanode dead? */
+  public boolean isDatanodeDead(DatanodeDescriptor node) {
+    return (node.getLastUpdate() <
+            (Util.now() - heartbeatExpireInterval));
+  }
+
   /** Add a datanode. */
   private void addDatanode(final DatanodeDescriptor node) {
     // To keep host2DatanodeMap consistent with datanodeMap,
     // remove  from host2DatanodeMap the datanodeDescriptor removed
     // from datanodeMap before adding node to host2DatanodeMap.
-    synchronized (namesystem.datanodeMap) {
-      host2DatanodeMap.remove(
-          namesystem.datanodeMap.put(node.getStorageID(), node));
+    synchronized(datanodeMap) {
+      host2DatanodeMap.remove(datanodeMap.put(node.getStorageID(), node));
     }
 
     host2DatanodeMap.add(node);
@@ -152,8 +276,8 @@ public class DatanodeManager {
   /** Physically remove node from datanodeMap. */
   private void wipeDatanode(final DatanodeID node) throws IOException {
     final String key = node.getStorageID();
-    synchronized (namesystem.datanodeMap) {
-      host2DatanodeMap.remove(namesystem.datanodeMap.remove(key));
+    synchronized (datanodeMap) {
+      host2DatanodeMap.remove(datanodeMap.remove(key));
     }
     if (LOG.isDebugEnabled()) {
       LOG.debug(getClass().getSimpleName() + ".wipeDatanode("
@@ -315,7 +439,7 @@ public class DatanodeManager {
     String newID = null;
     while(newID == null) {
       newID = "DS" + Integer.toString(DFSUtil.getRandom().nextInt());
-      if (namesystem.datanodeMap.get(newID) != null)
+      if (datanodeMap.get(newID) != null)
         newID = null;
     }
     return newID;
@@ -350,7 +474,7 @@ public class DatanodeManager {
         + "node registration from " + nodeReg.getName()
         + " storage " + nodeReg.getStorageID());
 
-    DatanodeDescriptor nodeS = namesystem.datanodeMap.get(nodeReg.getStorageID());
+    DatanodeDescriptor nodeS = datanodeMap.get(nodeReg.getStorageID());
     DatanodeDescriptor nodeN = getDatanodeByHost(nodeReg.getName());
       
     if (nodeN != null && nodeN != nodeS) {
@@ -461,7 +585,7 @@ public class DatanodeManager {
    * 4. Removed from exclude --> stop decommission.
    */
   public void refreshDatanodes() throws IOException {
-    for(DatanodeDescriptor node : namesystem.datanodeMap.values()) {
+    for(DatanodeDescriptor node : datanodeMap.values()) {
       // Check if not include.
       if (!inHostsList(node, null)) {
         node.setDisallowed(true);  // case 2.
@@ -475,6 +599,45 @@ public class DatanodeManager {
     }
   }
 
+  /** @return the number of live datanodes. */
+  public int getNumLiveDataNodes() {
+    int numLive = 0;
+    synchronized (datanodeMap) {
+      for(DatanodeDescriptor dn : datanodeMap.values()) {
+        if (!isDatanodeDead(dn) ) {
+          numLive++;
+        }
+      }
+    }
+    return numLive;
+  }
+
+  /** @return the number of dead datanodes. */
+  public int getNumDeadDataNodes() {
+    int numDead = 0;
+    synchronized (datanodeMap) {   
+      for(DatanodeDescriptor dn : datanodeMap.values()) {
+        if (isDatanodeDead(dn) ) {
+          numDead++;
+        }
+      }
+    }
+    return numDead;
+  }
+
+  /** Fetch live and dead datanodes. */
+  public void fetchDatanodess(final List<DatanodeDescriptor> live, 
+      final List<DatanodeDescriptor> dead) {
+    final List<DatanodeDescriptor> results =
+        getDatanodeListForReport(DatanodeReportType.ALL);    
+    for(DatanodeDescriptor node : results) {
+      if (isDatanodeDead(node))
+        dead.add(node);
+      else
+        live.add(node);
+    }
+  }
+
   /** For generating datanode reports */
   public List<DatanodeDescriptor> getDatanodeListForReport(
       final DatanodeReportType type) {
@@ -499,13 +662,13 @@ public class DatanodeManager {
 
     ArrayList<DatanodeDescriptor> nodes = null;
     
-    synchronized (namesystem.datanodeMap) {
-      nodes = new ArrayList<DatanodeDescriptor>(namesystem.datanodeMap.size() + 
+    synchronized(datanodeMap) {
+      nodes = new ArrayList<DatanodeDescriptor>(datanodeMap.size() + 
                                                 mustList.size());
-      Iterator<DatanodeDescriptor> it = namesystem.datanodeMap.values().iterator();
+      Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator();
       while (it.hasNext()) { 
         DatanodeDescriptor dn = it.next();
-        boolean isDead = namesystem.isDatanodeDead(dn);
+        final boolean isDead = isDatanodeDead(dn);
         if ( (isDead && listDeadNodes) || (!isDead && listLiveNodes) ) {
           nodes.add(dn);
         }
@@ -537,4 +700,77 @@ public class DatanodeManager {
     }
     return nodes;
   }
+  
+  private void setDatanodeDead(DatanodeDescriptor node) throws IOException {
+    node.setLastUpdate(0);
+  }
+
+  /** Handle heartbeat from datanodes. */
+  public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
+      final String blockPoolId,
+      long capacity, long dfsUsed, long remaining, long blockPoolUsed,
+      int xceiverCount, int maxTransfers, int failedVolumes
+      ) throws IOException {
+    synchronized (namesystem.heartbeats) {
+      synchronized (datanodeMap) {
+        DatanodeDescriptor nodeinfo = null;
+        try {
+          nodeinfo = getDatanode(nodeReg);
+        } catch(UnregisteredNodeException e) {
+          return new DatanodeCommand[]{DatanodeCommand.REGISTER};
+        }
+        
+        // Check if this datanode should actually be shutdown instead. 
+        if (nodeinfo != null && nodeinfo.isDisallowed()) {
+          setDatanodeDead(nodeinfo);
+          throw new DisallowedDatanodeException(nodeinfo);
+        }
+         
+        if (nodeinfo == null || !nodeinfo.isAlive) {
+          return new DatanodeCommand[]{DatanodeCommand.REGISTER};
+        }
+
+        namesystem.updateStats(nodeinfo, false);
+        nodeinfo.updateHeartbeat(capacity, dfsUsed, remaining, blockPoolUsed,
+            xceiverCount, failedVolumes);
+        namesystem.updateStats(nodeinfo, true);
+        
+        //check lease recovery
+        BlockInfoUnderConstruction[] blocks = nodeinfo
+            .getLeaseRecoveryCommand(Integer.MAX_VALUE);
+        if (blocks != null) {
+          BlockRecoveryCommand brCommand = new BlockRecoveryCommand(
+              blocks.length);
+          for (BlockInfoUnderConstruction b : blocks) {
+            brCommand.add(new RecoveringBlock(
+                new ExtendedBlock(blockPoolId, b), b.getExpectedLocations(), b
+                    .getBlockRecoveryId()));
+          }
+          return new DatanodeCommand[] { brCommand };
+        }
+
+        final List<DatanodeCommand> cmds = new ArrayList<DatanodeCommand>(3);
+        //check pending replication
+        List<BlockTargetPair> pendingList = nodeinfo.getReplicationCommand(
+              maxTransfers);
+        if (pendingList != null) {
+          cmds.add(new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blockPoolId,
+              pendingList));
+        }
+        //check block invalidation
+        Block[] blks = nodeinfo.getInvalidateBlocks(blockInvalidateLimit);
+        if (blks != null) {
+          cmds.add(new BlockCommand(DatanodeProtocol.DNA_INVALIDATE,
+              blockPoolId, blks));
+        }
+        
+        namesystem.addKeyUpdateCommand(cmds, nodeinfo);
+        if (!cmds.isEmpty()) {
+          return cmds.toArray(new DatanodeCommand[cmds.size()]);
+        }
+      }
+    }
+
+    return null;
+  }
 }

Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java?rev=1151339&r1=1151338&r2=1151339&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java Wed Jul 27 05:46:52 2011
@@ -83,8 +83,8 @@ class DecommissionManager {
     private void check() {
       int count = 0;
       for(Map.Entry<String, DatanodeDescriptor> entry
-          : new CyclicIteration<String, DatanodeDescriptor>(
-              fsnamesystem.datanodeMap, firstkey)) {
+          : blockManager.getDatanodeManager().getDatanodeCyclicIteration(
+              firstkey)) {
         final DatanodeDescriptor d = entry.getValue();
         firstkey = entry.getKey();
 

Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1151339&r1=1151338&r2=1151339&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Wed Jul 27 05:46:52 2011
@@ -237,35 +237,9 @@ public class FSNamesystem implements FSC
   
   // Block pool ID used by this namenode
   String blockPoolId;
-    
-  /**
-   * Stores the datanode -> block map.  
-   * <p>
-   * Done by storing a set of {@link DatanodeDescriptor} objects, sorted by 
-   * storage id. In order to keep the storage map consistent it tracks 
-   * all storages ever registered with the namenode.
-   * A descriptor corresponding to a specific storage id can be
-   * <ul> 
-   * <li>added to the map if it is a new storage id;</li>
-   * <li>updated with a new datanode started as a replacement for the old one 
-   * with the same storage id; and </li>
-   * <li>removed if and only if an existing datanode is restarted to serve a
-   * different storage id.</li>
-   * </ul> <br>
-   * The list of the {@link DatanodeDescriptor}s in the map is checkpointed
-   * in the namespace image file. Only the {@link DatanodeInfo} part is 
-   * persistent, the list of blocks is restored from the datanode block
-   * reports. 
-   * <p>
-   * Mapping: StorageID -> DatanodeDescriptor
-   */
-  public final NavigableMap<String, DatanodeDescriptor> datanodeMap = 
-    new TreeMap<String, DatanodeDescriptor>();
 
   /**
-   * Stores a set of DatanodeDescriptor objects.
-   * This is a subset of {@link #datanodeMap}, containing nodes that are 
-   * considered alive.
+   * Stores a subset of datanodeMap, containing nodes that are considered alive.
    * The HeartbeatMonitor periodically checks for out-dated entries,
    * and removes them from the list.
    */
@@ -289,9 +263,6 @@ public class FSNamesystem implements FSC
 
   // heartbeatRecheckInterval is how often namenode checks for expired datanodes
   private long heartbeatRecheckInterval;
-  // heartbeatExpireInterval is how long namenode waits for datanode to report
-  // heartbeat
-  private long heartbeatExpireInterval;
 
   //resourceRecheckInterval is how often namenode checks for the disk space availability
   private long resourceRecheckInterval;
@@ -314,9 +285,6 @@ public class FSNamesystem implements FSC
    */
   private final GenerationStamp generationStamp = new GenerationStamp();
 
-  // Ask Datanode only up to this many blocks to delete.
-  public int blockInvalidateLimit = DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_DEFAULT;
-
   // precision of access times.
   private long accessTimePrecision = 0;
 
@@ -513,14 +481,9 @@ public class FSNamesystem implements FSC
     this.defaultPermission = PermissionStatus.createImmutable(
         fsOwner.getShortUserName(), supergroup, new FsPermission(filePermission));
 
-    long heartbeatInterval = conf.getLong(
-        DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
-        DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT) * 1000;
     this.heartbeatRecheckInterval = conf.getInt(
         DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 
         DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5 minutes
-    this.heartbeatExpireInterval = 2 * heartbeatRecheckInterval +
-      10 * heartbeatInterval;
     
     this.serverDefaults = new FsServerDefaults(
         conf.getLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE),
@@ -531,14 +494,6 @@ public class FSNamesystem implements FSC
     this.maxFsObjects = conf.getLong(DFSConfigKeys.DFS_NAMENODE_MAX_OBJECTS_KEY, 
                                      DFSConfigKeys.DFS_NAMENODE_MAX_OBJECTS_DEFAULT);
 
-    //default limit
-    this.blockInvalidateLimit = Math.max(this.blockInvalidateLimit, 
-                                         20*(int)(heartbeatInterval/1000));
-    //use conf value if it is set.
-    this.blockInvalidateLimit = conf.getInt(
-        DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_KEY, this.blockInvalidateLimit);
-    LOG.info(DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_KEY + "=" + this.blockInvalidateLimit);
-
     this.accessTimePrecision = conf.getLong(DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_KEY, 0);
     this.supportAppends = conf.getBoolean(DFSConfigKeys.DFS_SUPPORT_APPEND_KEY,
                                       DFSConfigKeys.DFS_SUPPORT_APPEND_DEFAULT);
@@ -642,12 +597,7 @@ public class FSNamesystem implements FSC
       out.println("Live Datanodes: "+live.size());
       out.println("Dead Datanodes: "+dead.size());
       blockManager.metaSave(out);
-  
-      //
-      // Dump all datanodes
-      //
-      datanodeDump(out);
-  
+
       out.flush();
       out.close();
     } finally {
@@ -688,45 +638,7 @@ public class FSNamesystem implements FSC
     readLock();
     try {
       checkSuperuserPrivilege();
-  
-      DatanodeDescriptor node = getDatanode(datanode);
-      if (node == null) {
-        NameNode.stateChangeLog.warn("BLOCK* NameSystem.getBlocks: "
-            + "Asking for blocks from an unrecorded node " + datanode.getName());
-        throw new IllegalArgumentException(
-            "Unexpected exception.  Got getBlocks message for datanode " +
-            datanode.getName() + ", but there is no info for it");
-      }
-  
-      int numBlocks = node.numBlocks();
-      if(numBlocks == 0) {
-        return new BlocksWithLocations(new BlockWithLocations[0]);
-      }
-      Iterator<BlockInfo> iter = node.getBlockIterator();
-      int startBlock = DFSUtil.getRandom().nextInt(numBlocks); // starting from a random block
-      // skip blocks
-      for(int i=0; i<startBlock; i++) {
-        iter.next();
-      }
-      List<BlockWithLocations> results = new ArrayList<BlockWithLocations>();
-      long totalSize = 0;
-      BlockInfo curBlock;
-      while(totalSize<size && iter.hasNext()) {
-        curBlock = iter.next();
-        if(!curBlock.isComplete())  continue;
-        totalSize += addBlock(curBlock, results);
-      }
-      if(totalSize<size) {
-        iter = node.getBlockIterator(); // start from the beginning
-        for(int i=0; i<startBlock&&totalSize<size; i++) {
-          curBlock = iter.next();
-          if(!curBlock.isComplete())  continue;
-          totalSize += addBlock(curBlock, results);
-        }
-      }
-  
-      return new BlocksWithLocations(
-          results.toArray(new BlockWithLocations[results.size()]));
+      return blockManager.getBlocksWithLocations(datanode, size);  
     } finally {
       readUnlock();
     }
@@ -742,22 +654,6 @@ public class FSNamesystem implements FSC
         : ExportedBlockKeys.DUMMY_KEYS;
   }
 
-  /**
-   * Get all valid locations of the block & add the block to results
-   * return the length of the added block; 0 if the block is not added
-   */
-  private long addBlock(Block block, List<BlockWithLocations> results) {
-    assert hasReadOrWriteLock();
-    ArrayList<String> machineSet = blockManager.getValidLocations(block);
-    if(machineSet.size() == 0) {
-      return 0;
-    } else {
-      results.add(new BlockWithLocations(block, 
-          machineSet.toArray(new String[machineSet.size()])));
-      return block.getNumBytes();
-    }
-  }
-
   /////////////////////////////////////////////////////////
   //
   // These methods are called by HadoopFS clients
@@ -1795,7 +1691,8 @@ public class FSNamesystem implements FSC
       //find datanode descriptors
       chosen = new ArrayList<DatanodeDescriptor>();
       for(DatanodeInfo d : existings) {
-        final DatanodeDescriptor descriptor = getDatanode(d);
+        final DatanodeDescriptor descriptor = blockManager.getDatanodeManager(
+            ).getDatanode(d);
         if (descriptor != null) {
           chosen.add(descriptor);
         }
@@ -2622,7 +2519,8 @@ public class FSNamesystem implements FSC
         if (newtargets.length > 0) {
           descriptors = new DatanodeDescriptor[newtargets.length];
           for(int i = 0; i < newtargets.length; i++) {
-            descriptors[i] = getDatanode(newtargets[i]);
+            descriptors[i] = blockManager.getDatanodeManager().getDatanode(
+                newtargets[i]);
           }
         }
         if (closeFile) {
@@ -2766,15 +2664,6 @@ public class FSNamesystem implements FSC
     return Storage.getRegistrationID(dir.fsImage.getStorage());
   }
 
-  public boolean isDatanodeDead(DatanodeDescriptor node) {
-    return (node.getLastUpdate() <
-            (now() - heartbeatExpireInterval));
-  }
-    
-  private void setDatanodeDead(DatanodeDescriptor node) throws IOException {
-    node.setLastUpdate(0);
-  }
-
   /**
    * The given node has reported in.  This method should:
    * 1) Record the heartbeat, so the datanode isn't timed out
@@ -2792,91 +2681,32 @@ public class FSNamesystem implements FSC
         throws IOException {
     readLock();
     try {
-      return handleHeartbeatInternal(nodeReg, capacity, dfsUsed, 
-          remaining, blockPoolUsed, xceiverCount, xmitsInProgress, 
-          failedVolumes);
+      final int maxTransfer = blockManager.maxReplicationStreams - xmitsInProgress;
+      DatanodeCommand[] cmds = blockManager.getDatanodeManager().handleHeartbeat(
+          nodeReg, blockPoolId, capacity, dfsUsed, remaining, blockPoolUsed,
+          xceiverCount, maxTransfer, failedVolumes);
+      if (cmds != null) {
+        return cmds;
+      }
+
+      //check distributed upgrade
+      DatanodeCommand cmd = getDistributedUpgradeCommand();
+      if (cmd != null) {
+        return new DatanodeCommand[] {cmd};
+      }
+      return null;
     } finally {
       readUnlock();
     }
   }
 
-  /** @see #handleHeartbeat(DatanodeRegistration, long, long, long, long, int, int, int) */
-  DatanodeCommand[] handleHeartbeatInternal(DatanodeRegistration nodeReg,
-      long capacity, long dfsUsed, long remaining, long blockPoolUsed,
-      int xceiverCount, int xmitsInProgress, int failedVolumes) 
-        throws IOException {
-    assert hasReadLock();
-    DatanodeCommand cmd = null;
-    synchronized (heartbeats) {
-      synchronized (datanodeMap) {
-        DatanodeDescriptor nodeinfo = null;
-        try {
-          nodeinfo = getDatanode(nodeReg);
-        } catch(UnregisteredNodeException e) {
-          return new DatanodeCommand[]{DatanodeCommand.REGISTER};
-        }
-        
-        // Check if this datanode should actually be shutdown instead. 
-        if (nodeinfo != null && nodeinfo.isDisallowed()) {
-          setDatanodeDead(nodeinfo);
-          throw new DisallowedDatanodeException(nodeinfo);
-        }
-         
-        if (nodeinfo == null || !nodeinfo.isAlive) {
-          return new DatanodeCommand[]{DatanodeCommand.REGISTER};
-        }
-
-        updateStats(nodeinfo, false);
-        nodeinfo.updateHeartbeat(capacity, dfsUsed, remaining, blockPoolUsed,
-            xceiverCount, failedVolumes);
-        updateStats(nodeinfo, true);
-        
-        //check lease recovery
-        BlockInfoUnderConstruction[] blocks = nodeinfo
-            .getLeaseRecoveryCommand(Integer.MAX_VALUE);
-        if (blocks != null) {
-          BlockRecoveryCommand brCommand = new BlockRecoveryCommand(
-              blocks.length);
-          for (BlockInfoUnderConstruction b : blocks) {
-            brCommand.add(new RecoveringBlock(
-                new ExtendedBlock(blockPoolId, b), b.getExpectedLocations(), b
-                    .getBlockRecoveryId()));
-          }
-          return new DatanodeCommand[] { brCommand };
-        }
-      
-        ArrayList<DatanodeCommand> cmds = new ArrayList<DatanodeCommand>(3);
-        //check pending replication
-        List<BlockTargetPair> pendingList = nodeinfo.getReplicationCommand(
-              blockManager.maxReplicationStreams - xmitsInProgress);
-        if (pendingList != null) {
-          cmd = new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blockPoolId,
-              pendingList);
-          cmds.add(cmd);
-        }
-        //check block invalidation
-        Block[] blks = nodeinfo.getInvalidateBlocks(blockInvalidateLimit);
-        if (blks != null) {
-          cmd = new BlockCommand(DatanodeProtocol.DNA_INVALIDATE, blockPoolId, blks);
-          cmds.add(cmd);
-        }
-        // check access key update
-        if (isBlockTokenEnabled && nodeinfo.needKeyUpdate) {
-          cmds.add(new KeyUpdateCommand(blockTokenSecretManager.exportKeys()));
-          nodeinfo.needKeyUpdate = false;
-        }
-        if (!cmds.isEmpty()) {
-          return cmds.toArray(new DatanodeCommand[cmds.size()]);
-        }
-      }
-    }
-
-    //check distributed upgrade
-    cmd = getDistributedUpgradeCommand();
-    if (cmd != null) {
-      return new DatanodeCommand[] {cmd};
+  public void addKeyUpdateCommand(final List<DatanodeCommand> cmds,
+      final DatanodeDescriptor nodeinfo) {
+    // check access key update
+    if (isBlockTokenEnabled && nodeinfo.needKeyUpdate) {
+      cmds.add(new KeyUpdateCommand(blockTokenSecretManager.exportKeys()));
+      nodeinfo.needKeyUpdate = false;
     }
-    return null;
   }
 
   public void updateStats(DatanodeDescriptor node, boolean isAdded) {
@@ -3017,7 +2847,8 @@ public class FSNamesystem implements FSC
       ) throws UnregisteredNodeException {
     writeLock();
     try {
-      DatanodeDescriptor nodeInfo = getDatanode(nodeID);
+      DatanodeDescriptor nodeInfo = getBlockManager().getDatanodeManager(
+          ).getDatanode(nodeID);
       if (nodeInfo != null) {
         removeDatanode(nodeInfo);
       } else {
@@ -3033,7 +2864,7 @@ public class FSNamesystem implements FSC
    * Remove a datanode descriptor.
    * @param nodeInfo datanode descriptor.
    */
-  private void removeDatanode(DatanodeDescriptor nodeInfo) {
+  public void removeDatanode(DatanodeDescriptor nodeInfo) {
     assert hasWriteLock();
     synchronized (heartbeats) {
       if (nodeInfo.isAlive) {
@@ -3064,6 +2895,7 @@ public class FSNamesystem implements FSC
    * effect causes more datanodes to be declared dead.
    */
   void heartbeatCheck() {
+    final DatanodeManager datanodeManager = getBlockManager().getDatanodeManager();
     // It's OK to check safe mode w/o taking the lock here, we re-check
     // for safe mode after taking the lock before removing a datanode.
     if (isInSafeMode()) {
@@ -3079,7 +2911,7 @@ public class FSNamesystem implements FSC
         for (Iterator<DatanodeDescriptor> it = heartbeats.iterator();
              it.hasNext();) {
           DatanodeDescriptor nodeInfo = it.next();
-          if (isDatanodeDead(nodeInfo)) {
+          if (datanodeManager.isDatanodeDead(nodeInfo)) {
             expiredHeartbeats.incr();
             foundDead = true;
             nodeID = nodeInfo;
@@ -3095,21 +2927,7 @@ public class FSNamesystem implements FSC
           return;
         }
         try {
-          synchronized(heartbeats) {
-            synchronized (datanodeMap) {
-              DatanodeDescriptor nodeInfo = null;
-              try {
-                nodeInfo = getDatanode(nodeID);
-              } catch (IOException e) {
-                nodeInfo = null;
-              }
-              if (nodeInfo != null && isDatanodeDead(nodeInfo)) {
-                NameNode.stateChangeLog.info("BLOCK* NameSystem.heartbeatCheck: "
-                                             + "lost heartbeat from " + nodeInfo.getName());
-                removeDatanode(nodeInfo);
-              }
-            }
-          }
+          datanodeManager.removeDeadDatanode(nodeID);
         } finally {
           writeUnlock();
         }
@@ -3129,7 +2947,8 @@ public class FSNamesystem implements FSC
     writeLock();
     startTime = now(); //after acquiring write lock
     try {
-      DatanodeDescriptor node = getDatanode(nodeID);
+      final DatanodeDescriptor node = blockManager.getDatanodeManager(
+          ).getDatanode(nodeID);
       if (node == null || !node.isAlive) {
         throw new IOException("ProcessReport from dead or unregistered node: "
                               + nodeID.getName());
@@ -3269,7 +3088,8 @@ public class FSNamesystem implements FSC
                                          ) throws IOException {
     writeLock();
     try {
-      DatanodeDescriptor node = getDatanode(nodeID);
+      final DatanodeDescriptor node = blockManager.getDatanodeManager(
+          ).getDatanode(nodeID);
       if (node == null || !node.isAlive) {
         NameNode.stateChangeLog.warn("BLOCK* NameSystem.blockReceived: " + block
             + " is received from dead or unregistered node " + nodeID.getName());
@@ -3475,33 +3295,7 @@ public class FSNamesystem implements FSC
                                           ArrayList<DatanodeDescriptor> dead) {
     readLock();
     try {
-      final List<DatanodeDescriptor> results = getBlockManager(
-          ).getDatanodeManager().getDatanodeListForReport(DatanodeReportType.ALL);    
-      for(Iterator<DatanodeDescriptor> it = results.iterator(); it.hasNext();) {
-        DatanodeDescriptor node = it.next();
-        if (isDatanodeDead(node))
-          dead.add(node);
-        else
-          live.add(node);
-      }
-    } finally {
-      readUnlock();
-    }
-  }
-
-  /**
-   * Prints information about all datanodes.
-   */
-  private void datanodeDump(PrintWriter out) {
-    readLock();
-    try {
-      synchronized (datanodeMap) {
-        out.println("Metasave: Number of datanodes: " + datanodeMap.size());
-        for(Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator(); it.hasNext();) {
-          DatanodeDescriptor node = it.next();
-          out.println(node.dumpDatanode());
-        }
-      }
+      getBlockManager().getDatanodeManager().fetchDatanodess(live, dead);
     } finally {
       readUnlock();
     }
@@ -3556,30 +3350,6 @@ public class FSNamesystem implements FSC
     checkSuperuserPrivilege();
     getFSImage().finalizeUpgrade();
   }
-    
-    
-  /**
-   * Get data node by storage ID.
-   * 
-   * @param nodeID
-   * @return DatanodeDescriptor or null if the node is not found.
-   * @throws IOException
-   */
-  public DatanodeDescriptor getDatanode(DatanodeID nodeID
-      ) throws UnregisteredNodeException {
-    assert hasReadOrWriteLock();
-    UnregisteredNodeException e = null;
-    DatanodeDescriptor node = datanodeMap.get(nodeID.getStorageID());
-    if (node == null) 
-      return null;
-    if (!node.getName().equals(nodeID.getName())) {
-      e = new UnregisteredNodeException(nodeID, node);
-      NameNode.stateChangeLog.fatal("BLOCK* NameSystem.getDatanode: "
-                                    + e.getLocalizedMessage());
-      throw e;
-    }
-    return node;
-  }
 
   /**
    * SafeModeInfo contains information related to the safe mode.
@@ -4503,43 +4273,14 @@ public class FSNamesystem implements FSC
   }
   
 
-  /**
-   * Number of live data nodes
-   * @return Number of live data nodes
-   */
   @Override // FSNamesystemMBean
   public int getNumLiveDataNodes() {
-    int numLive = 0;
-    synchronized (datanodeMap) {   
-      for(Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator(); 
-                                                               it.hasNext();) {
-        DatanodeDescriptor dn = it.next();
-        if (!isDatanodeDead(dn) ) {
-          numLive++;
-        }
-      }
-    }
-    return numLive;
+    return getBlockManager().getDatanodeManager().getNumLiveDataNodes();
   }
-  
 
-  /**
-   * Number of dead data nodes
-   * @return Number of dead data nodes
-   */
   @Override // FSNamesystemMBean
   public int getNumDeadDataNodes() {
-    int numDead = 0;
-    synchronized (datanodeMap) {   
-      for(Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator(); 
-                                                               it.hasNext();) {
-        DatanodeDescriptor dn = it.next();
-        if (isDatanodeDead(dn) ) {
-          numDead++;
-        }
-      }
-    }
-    return numDead;
+    return getBlockManager().getDatanodeManager().getNumDeadDataNodes();
   }
 
   /**
@@ -4699,11 +4440,12 @@ public class FSNamesystem implements FSC
     blockinfo.setNumBytes(newBlock.getNumBytes());
 
     // find the DatanodeDescriptor objects
+    final DatanodeManager dm = getBlockManager().getDatanodeManager();
     DatanodeDescriptor[] descriptors = null;
     if (newNodes.length > 0) {
       descriptors = new DatanodeDescriptor[newNodes.length];
       for(int i = 0; i < newNodes.length; i++) {
-        descriptors[i] = getDatanode(newNodes[i]);
+        descriptors[i] = dm.getDatanode(newNodes[i]);
       }
     }
     blockinfo.setExpectedLocations(descriptors);
@@ -4832,12 +4574,6 @@ public class FSNamesystem implements FSC
     return blockManager.numCorruptReplicas(blk);
   }
 
-  /** Get a datanode descriptor given corresponding storageID */
-  public DatanodeDescriptor getDatanode(String nodeID) {
-    assert hasReadOrWriteLock();
-    return datanodeMap.get(nodeID);
-  }
-
   /**
    * Return a range of corrupt replica block ids. Up to numExpectedBlocks 
    * blocks starting at the next block after startingBlockId are returned

Modified: hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestDecommission.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestDecommission.java?rev=1151339&r1=1151338&r2=1151339&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestDecommission.java (original)
+++ hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestDecommission.java Wed Jul 27 05:46:52 2011
@@ -226,7 +226,7 @@ public class TestDecommission {
     writeConfigFile(excludeFile, nodes);
     cluster.getNamesystem(nnIndex).refreshNodes(conf);
     DatanodeInfo ret = NameNodeAdapter.getDatanode(
-        cluster.getNameNode(nnIndex), info[index]);
+        cluster.getNamesystem(nnIndex), info[index]);
     waitNodeState(ret, waitForState);
     return ret;
   }
@@ -466,7 +466,7 @@ public class TestDecommission {
       // Stop decommissioning and verify stats
       writeConfigFile(excludeFile, null);
       fsn.refreshNodes(conf);
-      DatanodeInfo ret = NameNodeAdapter.getDatanode(namenode, downnode);
+      DatanodeInfo ret = NameNodeAdapter.getDatanode(fsn, downnode);
       waitNodeState(ret, AdminStates.NORMAL);
       verifyStats(namenode, fsn, ret, false);
     }

Modified: hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java?rev=1151339&r1=1151338&r2=1151339&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java (original)
+++ hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java Wed Jul 27 05:46:52 2011
@@ -25,9 +25,30 @@ import java.util.Set;
 
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.util.Daemon;
 
 public class BlockManagerTestUtil {
+
+  /** @return the datanode descriptor for the given the given storageID. */
+  public static DatanodeDescriptor getDatanode(final FSNamesystem ns,
+      final String storageID) {
+    ns.readLock();
+    try {
+      return ns.getBlockManager().getDatanodeManager().getDatanode(storageID);
+    } finally {
+      ns.readUnlock();
+    }
+  }
+
+
+  /**
+   * Refresh block queue counts on the name-node.
+   */
+  public static void updateState(final BlockManager blockManager) {
+    blockManager.updateState();
+  }
+
   /**
    * @return a tuple of the replica state (number racks, number live
    * replicas, and number needed replicas) for the given block.

Copied: hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestComputeInvalidateWork.java (from r1151310, hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestComputeInvalidateWork.java)
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestComputeInvalidateWork.java?p2=hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestComputeInvalidateWork.java&p1=hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestComputeInvalidateWork.java&r1=1151310&r2=1151339&rev=1151339&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestComputeInvalidateWork.java (original)
+++ hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestComputeInvalidateWork.java Wed Jul 27 05:46:52 2011
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hdfs.server.namenode;
+package org.apache.hadoop.hdfs.server.blockmanagement;
 
 import junit.framework.TestCase;
 
@@ -23,8 +23,10 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.common.GenerationStamp;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 
 /**
  * Test if FSNamesystem handles heartbeat right
@@ -41,6 +43,8 @@ public class TestComputeInvalidateWork e
     try {
       cluster.waitActive();
       final FSNamesystem namesystem = cluster.getNamesystem();
+      final BlockManager bm = namesystem.getBlockManager();
+      final int blockInvalidateLimit = bm.getDatanodeManager().blockInvalidateLimit;
       DatanodeDescriptor[] nodes =
         namesystem.heartbeats.toArray(new DatanodeDescriptor[NUM_OF_DATANODES]);
       assertEquals(nodes.length, NUM_OF_DATANODES);
@@ -48,26 +52,25 @@ public class TestComputeInvalidateWork e
       namesystem.writeLock();
       try {
         for (int i=0; i<nodes.length; i++) {
-          for(int j=0; j<3*namesystem.blockInvalidateLimit+1; j++) {
-            Block block = new Block(i*(namesystem.blockInvalidateLimit+1)+j, 0, 
+          for(int j=0; j<3*blockInvalidateLimit+1; j++) {
+            Block block = new Block(i*(blockInvalidateLimit+1)+j, 0, 
                 GenerationStamp.FIRST_VALID_STAMP);
-            namesystem.getBlockManager().addToInvalidates(block, nodes[i]);
+            bm.addToInvalidates(block, nodes[i]);
           }
         }
         
-        assertEquals(namesystem.blockInvalidateLimit*NUM_OF_DATANODES, 
-            namesystem.getBlockManager().computeInvalidateWork(NUM_OF_DATANODES+1));
-        assertEquals(namesystem.blockInvalidateLimit*NUM_OF_DATANODES, 
-            namesystem.getBlockManager().computeInvalidateWork(NUM_OF_DATANODES));
-        assertEquals(namesystem.blockInvalidateLimit*(NUM_OF_DATANODES-1), 
-            namesystem.getBlockManager().computeInvalidateWork(NUM_OF_DATANODES-1));
-        int workCount = namesystem.getBlockManager().computeInvalidateWork(1);
+        assertEquals(blockInvalidateLimit*NUM_OF_DATANODES, 
+            bm.computeInvalidateWork(NUM_OF_DATANODES+1));
+        assertEquals(blockInvalidateLimit*NUM_OF_DATANODES, 
+            bm.computeInvalidateWork(NUM_OF_DATANODES));
+        assertEquals(blockInvalidateLimit*(NUM_OF_DATANODES-1), 
+            bm.computeInvalidateWork(NUM_OF_DATANODES-1));
+        int workCount = bm.computeInvalidateWork(1);
         if (workCount == 1) {
-          assertEquals(namesystem.blockInvalidateLimit+1, 
-              namesystem.getBlockManager().computeInvalidateWork(2));
+          assertEquals(blockInvalidateLimit+1, bm.computeInvalidateWork(2));
         } else {
-          assertEquals(workCount, namesystem.blockInvalidateLimit);
-          assertEquals(2, namesystem.getBlockManager().computeInvalidateWork(2));
+          assertEquals(workCount, blockInvalidateLimit);
+          assertEquals(2, bm.computeInvalidateWork(2));
         }
       } finally {
         namesystem.writeUnlock();

Modified: hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java?rev=1151339&r1=1151338&r2=1151339&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java (original)
+++ hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java Wed Jul 27 05:46:52 2011
@@ -30,6 +30,7 @@ import org.apache.hadoop.hdfs.protocol.B
 import org.apache.hadoop.hdfs.server.common.GenerationStamp;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
@@ -54,14 +55,9 @@ public class TestHeartbeatHandling exten
       final String poolId = namesystem.getBlockPoolId();
       final DatanodeRegistration nodeReg = 
         DataNodeTestUtils.getDNRegistrationForBP(cluster.getDataNodes().get(0), poolId);
-        
-      namesystem.readLock();
-      DatanodeDescriptor dd;
-      try {
-        dd = namesystem.getDatanode(nodeReg);
-      } finally {
-        namesystem.readUnlock();
-      }
+
+
+      final DatanodeDescriptor dd = NameNodeAdapter.getDatanode(namesystem, nodeReg);
       
       final int REMAINING_BLOCKS = 1;
       final int MAX_REPLICATE_LIMIT = 

Modified: hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java?rev=1151339&r1=1151338&r2=1151339&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java (original)
+++ hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java Wed Jul 27 05:46:52 2011
@@ -596,7 +596,7 @@ public class TestBlockReport {
   }
 
   private void printStats() {
-    NameNodeAdapter.refreshBlockCounts(cluster.getNameNode());
+    BlockManagerTestUtil.updateState(cluster.getNamesystem().getBlockManager());
     if(LOG.isDebugEnabled()) {
       LOG.debug("Missing " + cluster.getNamesystem().getMissingBlocksCount());
       LOG.debug("Corrupted " + cluster.getNamesystem().getCorruptReplicaBlocks());

Modified: hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java?rev=1151339&r1=1151338&r2=1151339&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java (original)
+++ hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java Wed Jul 27 05:46:52 2011
@@ -46,14 +46,6 @@ public class NameNodeAdapter {
   }
 
   /**
-   * Refresh block queue counts on the name-node.
-   * @param namenode to proxy the invocation to
-   */
-  public static void refreshBlockCounts(NameNode namenode) {
-    namenode.getNamesystem().getBlockManager().updateState();
-  }
-
-  /**
    * Get the internal RPC server instance.
    * @return rpc server
    */
@@ -68,12 +60,11 @@ public class NameNodeAdapter {
   /**
    * Return the datanode descriptor for the given datanode.
    */
-  public static DatanodeDescriptor getDatanode(NameNode namenode,
+  public static DatanodeDescriptor getDatanode(final FSNamesystem ns,
       DatanodeID id) throws IOException {
-    FSNamesystem ns = namenode.getNamesystem();
     ns.readLock();
     try {
-      return ns.getDatanode(id);
+      return ns.getBlockManager().getDatanodeManager().getDatanode(id);
     } finally {
       ns.readUnlock();
     }

Modified: hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java?rev=1151339&r1=1151338&r2=1151339&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java (original)
+++ hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java Wed Jul 27 05:46:52 2011
@@ -29,6 +29,7 @@ import org.apache.hadoop.hdfs.DFSConfigK
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
@@ -61,13 +62,8 @@ public class TestDeadDatanode {
     FSNamesystem namesystem = cluster.getNamesystem();
     String state = alive ? "alive" : "dead";
     while (System.currentTimeMillis() < stopTime) {
-      namesystem.readLock();
-      DatanodeDescriptor dd;
-      try {
-        dd = namesystem.getDatanode(nodeID);
-      } finally {
-        namesystem.readUnlock();
-      }
+      final DatanodeDescriptor dd = BlockManagerTestUtil.getDatanode(
+          namesystem, nodeID);
       if (dd.isAlive == alive) {
         LOG.info("datanode " + nodeID + " is " + state);
         return;