You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2011/08/05 00:55:54 UTC

svn commit: r1154042 - in /hadoop/common/trunk/hdfs: ./ src/java/org/apache/hadoop/hdfs/protocol/ src/java/org/apache/hadoop/hdfs/server/blockmanagement/ src/java/org/apache/hadoop/hdfs/server/namenode/ src/test/hdfs/org/apache/hadoop/hdfs/ src/test/hd...

Author: szetszwo
Date: Thu Aug  4 22:55:48 2011
New Revision: 1154042

URL: http://svn.apache.org/viewvc?rev=1154042&view=rev
Log:
HDFS-2108. Move datanode heartbeat handling from namenode package to blockmanagement package.

Added:
    hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java
    hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
    hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestBlocksWithNotEnoughRacks.java
      - copied, changed from r1154040, hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlocksWithNotEnoughRacks.java
    hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestNodeCount.java
      - copied, changed from r1154040, hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestNodeCount.java
    hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java
      - copied, changed from r1154040, hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestOverReplicatedBlocks.java
Removed:
    hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlocksWithNotEnoughRacks.java
    hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestNodeCount.java
    hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestOverReplicatedBlocks.java
Modified:
    hadoop/common/trunk/hdfs/CHANGES.txt
    hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
    hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
    hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
    hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
    hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
    hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
    hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java
    hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestMissingBlocksAlert.java
    hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
    hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestComputeInvalidateWork.java
    hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java
    hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
    hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestNamenodeCapacityReport.java

Modified: hadoop/common/trunk/hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/CHANGES.txt?rev=1154042&r1=1154041&r2=1154042&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hdfs/CHANGES.txt Thu Aug  4 22:55:48 2011
@@ -635,6 +635,9 @@ Trunk (unreleased changes)
     HDFS-2225. Refactor file management so it's not in classes which should
     be generic. (Ivan Kelly via todd)
 
+    HDFS-2108. Move datanode heartbeat handling from namenode package to
+    blockmanagement package.  (szetszwo)
+
   OPTIMIZATIONS
 
     HDFS-1458. Improve checkpoint performance by avoiding unnecessary image

Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java?rev=1154042&r1=1154041&r2=1154042&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java Thu Aug  4 22:55:48 2011
@@ -565,7 +565,6 @@ public interface ClientProtocol extends 
    * <li> [3] contains number of under replicated blocks in the system.</li>
    * <li> [4] contains number of blocks with a corrupt replica. </li>
    * <li> [5] contains number of blocks without any good replicas left. </li>
-   * <li> [5] contains number of blocks without any good replicas left. </li>
    * <li> [6] contains the total used space of the block pool. </li>
    * </ul>
    * Use public constants like {@link #GET_STATS_CAPACITY_IDX} in place of 

Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1154042&r1=1154041&r2=1154042&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Thu Aug  4 22:55:48 2011
@@ -17,8 +17,6 @@
  */
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
-import static org.apache.hadoop.hdfs.server.common.Util.now;
-
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.util.ArrayList;
@@ -46,6 +44,7 @@ import org.apache.hadoop.hdfs.protocol.B
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportIterator;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
@@ -95,11 +94,6 @@ public class BlockManager {
     return isBlockTokenEnabled;
   }
 
-  /** get the block key update interval */
-  public long getBlockKeyUpdateInterval() {
-    return blockKeyUpdateInterval;
-  }
-
   /** get the BlockTokenSecretManager */
   public BlockTokenSecretManager getBlockTokenSecretManager() {
     return blockTokenSecretManager;
@@ -140,7 +134,8 @@ public class BlockManager {
   public final BlocksMap blocksMap;
 
   private final DatanodeManager datanodeManager;
-  
+  private final HeartbeatManager heartbeatManager;
+
   /** Replication thread. */
   final Daemon replicationThread = new Daemon(new ReplicationMonitor());
   
@@ -177,7 +172,7 @@ public class BlockManager {
   /** The maximum number of outgoing replication streams
    *  a given node should have at one time 
    */
-  public int maxReplicationStreams;
+  int maxReplicationStreams;
   /** Minimum copies needed or else write is disallowed */
   public final int minReplication;
   /** Default number of replicas */
@@ -217,22 +212,12 @@ public class BlockManager {
       setBlockToken(l);
     }
   }
-
-  /**
-   * Update access keys.
-   */
-  public void updateBlockKey() throws IOException {
-    this.blockTokenSecretManager.updateKeys();
-    synchronized (namesystem.heartbeats) {
-      for (DatanodeDescriptor nodeInfo : namesystem.heartbeats) {
-        nodeInfo.needKeyUpdate = true;
-      }
-    }
-  }
   
   public BlockManager(FSNamesystem fsn, Configuration conf) throws IOException {
     namesystem = fsn;
-    datanodeManager = new DatanodeManager(fsn, conf);
+    datanodeManager = new DatanodeManager(this, fsn, conf);
+    heartbeatManager = datanodeManager.getHeartbeatManager();
+
     blocksMap = new BlocksMap(DEFAULT_MAP_LOAD_FACTOR);
     blockplacement = BlockPlacementPolicy.getInstance(
         conf, namesystem, datanodeManager.getNetworkTopology());
@@ -387,6 +372,11 @@ public class BlockManager {
     getDatanodeManager().datanodeDump(out);
   }
 
+  /** @return maxReplicationStreams */
+  public int getMaxReplicationStreams() {
+    return maxReplicationStreams;
+  }
+
   /**
    * @param block
    * @return true if the block has minimum replicas
@@ -587,7 +577,8 @@ public class BlockManager {
       }
       final BlockInfoUnderConstruction uc = (BlockInfoUnderConstruction)blk;
       final DatanodeDescriptor[] locations = uc.getExpectedLocations();
-      return namesystem.createLocatedBlock(uc, locations, pos, false);
+      final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(), blk);
+      return new LocatedBlock(eb, locations, pos, false);
     }
 
     // get block locations
@@ -613,7 +604,8 @@ public class BlockManager {
           machines[j++] = d;
       }
     }
-    return namesystem.createLocatedBlock(blk, machines, pos, isCorrupt);
+    final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(), blk);
+    return new LocatedBlock(eb, machines, pos, isCorrupt);
   }
 
   /**
@@ -685,8 +677,8 @@ public class BlockManager {
   }
 
    
-  /** Remove a datanode. */
-  public void removeDatanode(final DatanodeDescriptor node) {
+  /** Remove the blocks associated to the given datanode. */
+  void removeBlocksAssociatedTo(final DatanodeDescriptor node) {
     final Iterator<? extends Block> it = node.getBlockIterator();
     while(it.hasNext()) {
       removeStoredBlock(it.next(), node);
@@ -694,11 +686,6 @@ public class BlockManager {
 
     node.resetBlocks();
     removeFromInvalidates(node.getStorageID());
-    datanodeManager.getNetworkTopology().remove(node);
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("remove datanode " + node.getName());
-    }
   }
   
   private void removeFromInvalidates(String storageID, Block block) {
@@ -887,7 +874,7 @@ public class BlockManager {
    * @param nodesToProcess number of datanodes to schedule deletion work
    * @return total number of block for deletion
    */
-  public int computeInvalidateWork(int nodesToProcess) {
+  int computeInvalidateWork(int nodesToProcess) {
     int numOfNodes = recentInvalidateSets.size();
     nodesToProcess = Math.min(numOfNodes, nodesToProcess);
 
@@ -927,7 +914,7 @@ public class BlockManager {
    *
    * @return number of blocks scheduled for replication during this iteration.
    */
-  public int computeReplicationWork(int blocksToProcess) throws IOException {
+  private int computeReplicationWork(int blocksToProcess) throws IOException {
     // Choose the blocks to be replicated
     List<List<Block>> blocksToReplicate =
       chooseUnderReplicatedBlocks(blocksToProcess);
@@ -2047,7 +2034,7 @@ public class BlockManager {
    * On stopping decommission, check if the node has excess replicas.
    * If there are any excess replicas, call processOverReplicatedBlock()
    */
-  private void processOverReplicatedBlocksOnReCommission(
+  void processOverReplicatedBlocksOnReCommission(
       final DatanodeDescriptor srcNode) {
     final Iterator<? extends Block> it = srcNode.getBlockIterator();
     while(it.hasNext()) {
@@ -2145,6 +2132,16 @@ public class BlockManager {
     return blocksMap.getStoredBlock(block);
   }
 
+
+  /** Should the access keys be updated? */
+  boolean shouldUpdateBlockKey(final long updateTime) throws IOException {
+    final boolean b = isBlockTokenEnabled && blockKeyUpdateInterval < updateTime;
+    if (b) {
+      blockTokenSecretManager.updateKeys();
+    }
+    return b;
+  }
+
   /* updates a block in under replication queue */
   public void updateNeededReplications(Block block, int curReplicasDelta,
       int expectedReplicasDelta) {
@@ -2356,57 +2353,11 @@ public class BlockManager {
   }
 
   /**
-   * Change, if appropriate, the admin state of a datanode to 
-   * decommission completed. Return true if decommission is complete.
-   */
-  boolean checkDecommissionStateInternal(DatanodeDescriptor node) {
-    // Check to see if all blocks in this decommissioned
-    // node has reached their target replication factor.
-    if (node.isDecommissionInProgress()) {
-      if (!isReplicationInProgress(node)) {
-        node.setDecommissioned();
-        LOG.info("Decommission complete for node " + node.getName());
-      }
-    }
-    return node.isDecommissioned();
-  }
-
-  /** Start decommissioning the specified datanode. */
-  void startDecommission(DatanodeDescriptor node) throws IOException {
-    if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
-      LOG.info("Start Decommissioning node " + node.getName() + " with " + 
-          node.numBlocks() +  " blocks.");
-      synchronized (namesystem.heartbeats) {
-        namesystem.updateStats(node, false);
-        node.startDecommission();
-        namesystem.updateStats(node, true);
-      }
-      node.decommissioningStatus.setStartTime(now());
-      
-      // all the blocks that reside on this node have to be replicated.
-      checkDecommissionStateInternal(node);
-    }
-  }
-
-  /** Stop decommissioning the specified datanodes. */
-  void stopDecommission(DatanodeDescriptor node) throws IOException {
-    if (node.isDecommissionInProgress() || node.isDecommissioned()) {
-      LOG.info("Stop Decommissioning node " + node.getName());
-      synchronized (namesystem.heartbeats) {
-        namesystem.updateStats(node, false);
-        node.stopDecommission();
-        namesystem.updateStats(node, true);
-      }
-      processOverReplicatedBlocksOnReCommission(node);
-    }
-  }
-
-  /**
    * Periodically calls computeReplicationWork().
    */
   private class ReplicationMonitor implements Runnable {
-    static final int INVALIDATE_WORK_PCT_PER_ITERATION = 32;
-    static final float REPLICATION_WORK_MULTIPLIER_PER_ITERATION = 2;
+    private static final int INVALIDATE_WORK_PCT_PER_ITERATION = 32;
+    private static final int REPLICATION_WORK_MULTIPLIER_PER_ITERATION = 2;
 
     @Override
     public void run() {
@@ -2439,8 +2390,6 @@ public class BlockManager {
    */
   int computeDatanodeWork() throws IOException {
     int workFound = 0;
-    int blocksToProcess = 0;
-    int nodesToProcess = 0;
     // Blocks should not be replicated or removed if in safe mode.
     // It's OK to check safe mode here w/o holding lock, in the worst
     // case extra replications will be scheduled, and these will get
@@ -2448,11 +2397,11 @@ public class BlockManager {
     if (namesystem.isInSafeMode())
       return workFound;
 
-    synchronized (namesystem.heartbeats) {
-      blocksToProcess = (int) (namesystem.heartbeats.size() * ReplicationMonitor.REPLICATION_WORK_MULTIPLIER_PER_ITERATION);
-      nodesToProcess = (int) Math.ceil((double) namesystem.heartbeats.size()
-          * ReplicationMonitor.INVALIDATE_WORK_PCT_PER_ITERATION / 100);
-    }
+    final int numlive = heartbeatManager.getLiveDatanodeCount();
+    final int blocksToProcess = numlive
+        * ReplicationMonitor.REPLICATION_WORK_MULTIPLIER_PER_ITERATION;
+    final int nodesToProcess = (int) Math.ceil(numlive
+        * ReplicationMonitor.INVALIDATE_WORK_PCT_PER_ITERATION / 100.0);
 
     workFound = this.computeReplicationWork(blocksToProcess);
 

Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java?rev=1154042&r1=1154041&r2=1154042&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java Thu Aug  4 22:55:48 2011
@@ -34,7 +34,6 @@ import org.apache.hadoop.hdfs.protocol.F
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.server.namenode.FSClusterStats;
 import org.apache.hadoop.hdfs.server.namenode.FSInodeInfo;
-import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.net.NodeBase;

Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java?rev=1154042&r1=1154041&r2=1154042&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java Thu Aug  4 22:55:48 2011
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
+import static org.apache.hadoop.hdfs.server.common.Util.now;
+
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.net.InetAddress;
@@ -49,6 +51,7 @@ import org.apache.hadoop.hdfs.server.blo
 import org.apache.hadoop.hdfs.server.common.Util;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
@@ -56,7 +59,6 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
-import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.net.CachedDNSToSwitchMapping;
 import org.apache.hadoop.net.DNSToSwitchMapping;
@@ -75,7 +77,10 @@ import org.apache.hadoop.util.Reflection
 public class DatanodeManager {
   static final Log LOG = LogFactory.getLog(DatanodeManager.class);
 
-  final FSNamesystem namesystem;
+  private final FSNamesystem namesystem;
+  private final BlockManager blockManager;
+
+  private final HeartbeatManager heartbeatManager;
 
   /**
    * Stores the datanode -> block map.  
@@ -117,9 +122,14 @@ public class DatanodeManager {
   /** Ask Datanode only up to this many blocks to delete. */
   final int blockInvalidateLimit;
   
-  DatanodeManager(final FSNamesystem namesystem, final Configuration conf
+  DatanodeManager(final BlockManager blockManager,
+      final FSNamesystem namesystem, final Configuration conf
       ) throws IOException {
     this.namesystem = namesystem;
+    this.blockManager = blockManager;
+
+    this.heartbeatManager = new HeartbeatManager(namesystem, conf);
+
     this.hostsReader = new HostsFileReader(
         conf.get(DFSConfigKeys.DFS_HOSTS, ""),
         conf.get(DFSConfigKeys.DFS_HOSTS_EXCLUDE, ""));
@@ -158,17 +168,30 @@ public class DatanodeManager {
         conf.getInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_NODES_PER_INTERVAL_KEY, 
                     DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_NODES_PER_INTERVAL_DEFAULT)));
     decommissionthread.start();
+
+    heartbeatManager.activate(conf);
   }
 
   void close() {
     if (decommissionthread != null) decommissionthread.interrupt();
+    heartbeatManager.close();
   }
 
   /** @return the network topology. */
   public NetworkTopology getNetworkTopology() {
     return networktopology;
   }
-  
+
+  /** @return the heartbeat manager. */
+  HeartbeatManager getHeartbeatManager() {
+    return heartbeatManager;
+  }
+
+  /** @return the datanode statistics. */
+  public DatanodeStatistics getDatanodeStatistics() {
+    return heartbeatManager;
+  }
+
   /** Sort the located blocks by the distance to the target host. */
   public void sortLocatedBlocks(final String targethost,
       final List<LocatedBlock> locatedblocks) {
@@ -231,9 +254,44 @@ public class DatanodeManager {
     }
   }
 
+  /**
+   * Remove a datanode descriptor.
+   * @param nodeInfo datanode descriptor.
+   */
+  private void removeDatanode(DatanodeDescriptor nodeInfo) {
+    assert namesystem.hasWriteLock();
+    heartbeatManager.removeDatanode(nodeInfo);
+    blockManager.removeBlocksAssociatedTo(nodeInfo);
+    networktopology.remove(nodeInfo);
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("remove datanode " + nodeInfo.getName());
+    }
+    namesystem.checkSafeMode();
+  }
+
+  /**
+   * Remove a datanode
+   * @throws UnregisteredNodeException 
+   */
+  public void removeDatanode(final DatanodeID node
+      ) throws UnregisteredNodeException {
+    namesystem.writeLock();
+    try {
+      final DatanodeDescriptor descriptor = getDatanode(node);
+      if (descriptor != null) {
+        removeDatanode(descriptor);
+      } else {
+        NameNode.stateChangeLog.warn("BLOCK* removeDatanode: "
+                                     + node.getName() + " does not exist");
+      }
+    } finally {
+      namesystem.writeUnlock();
+    }
+  }
+
   /** Remove a dead datanode. */
-  public void removeDeadDatanode(final DatanodeID nodeID) {
-    synchronized(namesystem.heartbeats) {
+  void removeDeadDatanode(final DatanodeID nodeID) {
       synchronized(datanodeMap) {
         DatanodeDescriptor d;
         try {
@@ -244,14 +302,13 @@ public class DatanodeManager {
         if (d != null && isDatanodeDead(d)) {
           NameNode.stateChangeLog.info(
               "BLOCK* removeDeadDatanode: lost heartbeat from " + d.getName());
-          namesystem.removeDatanode(d);
+          removeDatanode(d);
         }
       }
-    }
   }
 
   /** Is the datanode dead? */
-  public boolean isDatanodeDead(DatanodeDescriptor node) {
+  boolean isDatanodeDead(DatanodeDescriptor node) {
     return (node.getLastUpdate() <
             (Util.now() - heartbeatExpireInterval));
   }
@@ -423,11 +480,48 @@ public class DatanodeManager {
     throws IOException {
     // If the registered node is in exclude list, then decommission it
     if (inExcludedHostsList(nodeReg, ipAddr)) {
-      namesystem.getBlockManager().startDecommission(nodeReg);
+      startDecommission(nodeReg);
+    }
+  }
+
+  /**
+   * Change, if appropriate, the admin state of a datanode to 
+   * decommission completed. Return true if decommission is complete.
+   */
+  boolean checkDecommissionState(DatanodeDescriptor node) {
+    // Check to see if all blocks in this decommissioned
+    // node has reached their target replication factor.
+    if (node.isDecommissionInProgress()) {
+      if (!blockManager.isReplicationInProgress(node)) {
+        node.setDecommissioned();
+        LOG.info("Decommission complete for node " + node.getName());
+      }
+    }
+    return node.isDecommissioned();
+  }
+
+  /** Start decommissioning the specified datanode. */
+  private void startDecommission(DatanodeDescriptor node) throws IOException {
+    if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
+      LOG.info("Start Decommissioning node " + node.getName() + " with " + 
+          node.numBlocks() +  " blocks.");
+      heartbeatManager.startDecommission(node);
+      node.decommissioningStatus.setStartTime(now());
+      
+      // all the blocks that reside on this node have to be replicated.
+      checkDecommissionState(node);
+    }
+  }
+
+  /** Stop decommissioning the specified datanodes. */
+  void stopDecommission(DatanodeDescriptor node) throws IOException {
+    if (node.isDecommissionInProgress() || node.isDecommissioned()) {
+      LOG.info("Stop Decommissioning node " + node.getName());
+      heartbeatManager.stopDecommission(node);
+      blockManager.processOverReplicatedBlocksOnReCommission(node);
     }
   }
 
-  
   /**
    * Generate new storage ID.
    * 
@@ -483,7 +577,7 @@ public class DatanodeManager {
                         + "node from name: " + nodeN.getName());
       // nodeN previously served a different data storage, 
       // which is not served by anybody anymore.
-      namesystem.removeDatanode(nodeN);
+      removeDatanode(nodeN);
       // physically remove node from datanodeMap
       wipeDatanode(nodeN);
       nodeN = null;
@@ -525,14 +619,7 @@ public class DatanodeManager {
       getNetworkTopology().add(nodeS);
         
       // also treat the registration message as a heartbeat
-      synchronized(namesystem.heartbeats) {
-        if( !namesystem.heartbeats.contains(nodeS)) {
-          namesystem.heartbeats.add(nodeS);
-          //update its timestamp
-          nodeS.updateHeartbeat(0L, 0L, 0L, 0L, 0, 0);
-          nodeS.isAlive = true;
-        }
-      }
+      heartbeatManager.register(nodeS);
       checkDecommissioning(nodeS, dnAddress);
       return;
     } 
@@ -556,12 +643,9 @@ public class DatanodeManager {
     checkDecommissioning(nodeDescr, dnAddress);
     
     // also treat the registration message as a heartbeat
-    synchronized(namesystem.heartbeats) {
-      namesystem.heartbeats.add(nodeDescr);
-      nodeDescr.isAlive = true;
-      // no need to update its timestamp
-      // because its is done when the descriptor is created
-    }
+    // no need to update its timestamp
+    // because its is done when the descriptor is created
+    heartbeatManager.addDatanode(nodeDescr);
   }
 
   /** Reread include/exclude files. */
@@ -589,12 +673,12 @@ public class DatanodeManager {
     for(DatanodeDescriptor node : datanodeMap.values()) {
       // Check if not include.
       if (!inHostsList(node, null)) {
-        node.setDisallowed(true);  // case 2.
+        node.setDisallowed(true); // case 2.
       } else {
         if (inExcludedHostsList(node, null)) {
-          namesystem.getBlockManager().startDecommission(node);   // case 3.
+          startDecommission(node); // case 3.
         } else {
-          namesystem.getBlockManager().stopDecommission(node);   // case 4.
+          stopDecommission(node); // case 4.
         }
       }
     }
@@ -712,7 +796,7 @@ public class DatanodeManager {
       long capacity, long dfsUsed, long remaining, long blockPoolUsed,
       int xceiverCount, int maxTransfers, int failedVolumes
       ) throws IOException {
-    synchronized (namesystem.heartbeats) {
+    synchronized (heartbeatManager) {
       synchronized (datanodeMap) {
         DatanodeDescriptor nodeinfo = null;
         try {
@@ -731,10 +815,8 @@ public class DatanodeManager {
           return new DatanodeCommand[]{DatanodeCommand.REGISTER};
         }
 
-        namesystem.updateStats(nodeinfo, false);
-        nodeinfo.updateHeartbeat(capacity, dfsUsed, remaining, blockPoolUsed,
-            xceiverCount, failedVolumes);
-        namesystem.updateStats(nodeinfo, true);
+        heartbeatManager.updateHeartbeat(nodeinfo, capacity, dfsUsed,
+            remaining, blockPoolUsed, xceiverCount, failedVolumes);
         
         //check lease recovery
         BlockInfoUnderConstruction[] blocks = nodeinfo

Added: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java?rev=1154042&view=auto
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java (added)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java Thu Aug  4 22:55:48 2011
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+
+/** Datanode statistics */
+public interface DatanodeStatistics {
+
+  /** @return the total capacity */
+  public long getCapacityTotal();
+
+  /** @return the used capacity */
+  public long getCapacityUsed();
+
+  /** @return the percentage of the used capacity over the total capacity. */
+  public float getCapacityUsedPercent();
+
+  /** @return the remaining capacity */
+  public long getCapacityRemaining();
+
+  /** @return the percentage of the remaining capacity over the total capacity. */
+  public float getCapacityRemainingPercent();
+
+  /** @return the block pool used. */
+  public long getBlockPoolUsed();
+
+  /** @return the percentage of the block pool used space over the total capacity. */
+  public float getPercentBlockPoolUsed();
+
+  /** @return the xceiver count */
+  public int getXceiverCount();
+
+  /**
+   * @return the total used space by data nodes for non-DFS purposes
+   * such as storing temporary files on the local file system
+   */
+  public long getCapacityUsedNonDFS();
+
+  /** The same as {@link ClientProtocol#getStats()}.
+   * The block related entries are set to -1.
+   */
+  public long[] getStats();
+}
\ No newline at end of file

Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java?rev=1154042&r1=1154041&r2=1154042&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java Thu Aug  4 22:55:48 2011
@@ -24,7 +24,6 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
-import org.apache.hadoop.util.CyclicIteration;
 
 /**
  * Manage node decommissioning.
@@ -35,11 +34,9 @@ class DecommissionManager {
   static final Log LOG = LogFactory.getLog(DecommissionManager.class);
 
   private final FSNamesystem fsnamesystem;
-  private final BlockManager blockManager;
 
-  DecommissionManager(FSNamesystem namesystem) {
+  DecommissionManager(final FSNamesystem namesystem) {
     this.fsnamesystem = namesystem;
-    this.blockManager = fsnamesystem.getBlockManager();
   }
 
   /** Periodically check decommission status. */
@@ -81,16 +78,16 @@ class DecommissionManager {
     }
     
     private void check() {
+      final DatanodeManager dm = fsnamesystem.getBlockManager().getDatanodeManager();
       int count = 0;
       for(Map.Entry<String, DatanodeDescriptor> entry
-          : blockManager.getDatanodeManager().getDatanodeCyclicIteration(
-              firstkey)) {
+          : dm.getDatanodeCyclicIteration(firstkey)) {
         final DatanodeDescriptor d = entry.getValue();
         firstkey = entry.getKey();
 
         if (d.isDecommissionInProgress()) {
           try {
-            blockManager.checkDecommissionStateInternal(d);
+            dm.checkDecommissionState(d);
           } catch(Exception e) {
             LOG.warn("entry=" + entry, e);
           }

Added: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java?rev=1154042&view=auto
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java (added)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java Thu Aug  4 22:55:48 2011
@@ -0,0 +1,301 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.server.common.Util;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.util.Daemon;
+
+/**
+ * Manage the heartbeats received from datanodes.
+ * The datanode list and statistics are synchronized
+ * by the heartbeat manager lock.
+ */
+class HeartbeatManager implements DatanodeStatistics {
+  static final Log LOG = LogFactory.getLog(HeartbeatManager.class);
+
+  /**
+   * Stores a subset of the datanodeMap in DatanodeManager,
+   * containing nodes that are considered alive.
+   * The HeartbeatMonitor periodically checks for out-dated entries,
+   * and removes them from the list.
+   * It is synchronized by the heartbeat manager lock.
+   */
+  private final List<DatanodeDescriptor> datanodes = new ArrayList<DatanodeDescriptor>();
+
+  /** Statistics, which are synchronized by the heartbeat manager lock. */
+  private final Stats stats = new Stats();
+
+  /** The time period to check for expired datanodes */
+  private final long heartbeatRecheckInterval;
+  /** Heartbeat monitor thread */
+  private final Daemon heartbeatThread = new Daemon(new Monitor());
+
+  final FSNamesystem namesystem;
+
+  HeartbeatManager(final FSNamesystem namesystem, final Configuration conf) {
+    this.heartbeatRecheckInterval = conf.getInt(
+        DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 
+        DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5 minutes
+
+    this.namesystem = namesystem;
+  }
+
+  void activate(Configuration conf) {
+    heartbeatThread.start();
+  }
+
+  void close() {
+    heartbeatThread.interrupt();
+  }
+  
+  synchronized int getLiveDatanodeCount() {
+    return datanodes.size();
+  }
+
+  @Override
+  public synchronized long getCapacityTotal() {
+    return stats.capacityTotal;
+  }
+
+  @Override
+  public synchronized long getCapacityUsed() {
+    return stats.capacityUsed;
+  }
+
+  @Override
+  public synchronized float getCapacityUsedPercent() {
+    return DFSUtil.getPercentUsed(stats.capacityUsed, stats.capacityTotal);
+  }
+
+  @Override
+  public synchronized long getCapacityRemaining() {
+    return stats.capacityRemaining;
+  }
+
+  @Override
+  public synchronized float getCapacityRemainingPercent() {
+    return DFSUtil.getPercentRemaining(
+        stats.capacityRemaining, stats.capacityTotal);
+  }
+
+  @Override
+  public synchronized long getBlockPoolUsed() {
+    return stats.blockPoolUsed;
+  }
+
+  @Override
+  public synchronized float getPercentBlockPoolUsed() {
+    return DFSUtil.getPercentUsed(stats.blockPoolUsed, stats.capacityTotal);
+  }
+
+  @Override
+  public synchronized long getCapacityUsedNonDFS() {
+    final long nonDFSUsed = stats.capacityTotal
+        - stats.capacityRemaining - stats.capacityUsed;
+    return nonDFSUsed < 0L? 0L : nonDFSUsed;
+  }
+
+  @Override
+  public synchronized int getXceiverCount() {
+    return stats.xceiverCount;
+  }
+
+  @Override
+  public synchronized long[] getStats() {
+    return new long[] {getCapacityTotal(),
+                       getCapacityUsed(),
+                       getCapacityRemaining(),
+                       -1L,
+                       -1L,
+                       -1L,
+                       getBlockPoolUsed()};
+  }
+
+  synchronized void register(final DatanodeDescriptor d) {
+    if (!datanodes.contains(d)) {
+      addDatanode(d);
+
+      //update its timestamp
+      d.updateHeartbeat(0L, 0L, 0L, 0L, 0, 0);
+    }
+  }
+
+  synchronized DatanodeDescriptor[] getDatanodes() {
+    return datanodes.toArray(new DatanodeDescriptor[datanodes.size()]);
+  }
+
+  synchronized void addDatanode(final DatanodeDescriptor d) {
+    datanodes.add(d);
+    d.isAlive = true;
+  }
+
+  synchronized void removeDatanode(DatanodeDescriptor node) {
+    if (node.isAlive) {
+      stats.subtract(node);
+      datanodes.remove(node);
+      node.isAlive = false;
+    }
+  }
+
+  synchronized void updateHeartbeat(final DatanodeDescriptor node,
+      long capacity, long dfsUsed, long remaining, long blockPoolUsed,
+      int xceiverCount, int failedVolumes) {
+    stats.subtract(node);
+    node.updateHeartbeat(capacity, dfsUsed, remaining, blockPoolUsed,
+        xceiverCount, failedVolumes);
+    stats.add(node);
+  }
+
+  synchronized void startDecommission(final DatanodeDescriptor node) {
+    stats.subtract(node);
+    node.startDecommission();
+    stats.add(node);
+  }
+
+  synchronized void stopDecommission(final DatanodeDescriptor node) {
+    stats.subtract(node);
+    node.stopDecommission();
+    stats.add(node);
+  }
+  
+  /**
+   * Check if there are any expired heartbeats, and if so,
+   * whether any blocks have to be re-replicated.
+   * While removing dead datanodes, make sure that only one datanode is marked
+   * dead at a time within the synchronized section. Otherwise, a cascading
+   * effect causes more datanodes to be declared dead.
+   */
+  void heartbeatCheck() {
+    final DatanodeManager dm = namesystem.getBlockManager().getDatanodeManager();
+    // It's OK to check safe mode w/o taking the lock here, we re-check
+    // for safe mode after taking the lock before removing a datanode.
+    if (namesystem.isInSafeMode()) {
+      return;
+    }
+    boolean allAlive = false;
+    while (!allAlive) {
+      // locate the first dead node.
+      DatanodeID dead = null;
+      synchronized(this) {
+        for (DatanodeDescriptor d : datanodes) {
+          if (dm.isDatanodeDead(d)) {
+            namesystem.incrExpiredHeartbeats();
+            dead = d;
+            break;
+          }
+        }
+      }
+
+      allAlive = dead == null;
+      if (!allAlive) {
+        // acquire the fsnamesystem lock, and then remove the dead node.
+        namesystem.writeLock();
+        if (namesystem.isInSafeMode()) {
+          return;
+        }
+        try {
+          synchronized(this) {
+            dm.removeDeadDatanode(dead);
+          }
+        } finally {
+          namesystem.writeUnlock();
+        }
+      }
+    }
+  }
+
+
+  /** Periodically check heartbeat and update block key */
+  private class Monitor implements Runnable {
+    private long lastHeartbeatCheck;
+    private long lastBlockKeyUpdate;
+
+    @Override
+    public void run() {
+      while(namesystem.isRunning()) {
+        try {
+          final long now = Util.now();
+          if (lastHeartbeatCheck + heartbeatRecheckInterval < now) {
+            heartbeatCheck();
+            lastHeartbeatCheck = now;
+          }
+          if (namesystem.getBlockManager().shouldUpdateBlockKey(
+              now - lastBlockKeyUpdate)) {
+            synchronized(HeartbeatManager.this) {
+              for(DatanodeDescriptor d : datanodes) {
+                d.needKeyUpdate = true;
+              }
+            }
+            lastBlockKeyUpdate = now;
+          }
+        } catch (Exception e) {
+          LOG.error("Exception while checking heartbeat", e);
+        }
+        try {
+          Thread.sleep(5000);  // 5 seconds
+        } catch (InterruptedException ie) {
+        }
+      }
+    }
+  }
+
+  /** Datanode statistics.
+   * For decommissioning/decommissioned nodes, only used capacity is counted.
+   */
+  private static class Stats {
+    private long capacityTotal = 0L;
+    private long capacityUsed = 0L;
+    private long capacityRemaining = 0L;
+    private long blockPoolUsed = 0L;
+    private int xceiverCount = 0;
+
+    private void add(final DatanodeDescriptor node) {
+      capacityUsed += node.getDfsUsed();
+      blockPoolUsed += node.getBlockPoolUsed();
+      xceiverCount += node.getXceiverCount();
+      if (!(node.isDecommissionInProgress() || node.isDecommissioned())) {
+        capacityTotal += node.getCapacity();
+        capacityRemaining += node.getRemaining();
+      } else {
+        capacityTotal += node.getDfsUsed();
+      }
+    }
+
+    private void subtract(final DatanodeDescriptor node) {
+      capacityUsed -= node.getDfsUsed();
+      blockPoolUsed -= node.getBlockPoolUsed();
+      xceiverCount -= node.getXceiverCount();
+      if (!(node.isDecommissionInProgress() || node.isDecommissioned())) {
+        capacityTotal -= node.getCapacity();
+        capacityRemaining -= node.getRemaining();
+      } else {
+        capacityTotal -= node.getDfsUsed();
+      }
+    }
+  }
+}

Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1154042&r1=1154041&r2=1154042&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Thu Aug  4 22:55:48 2011
@@ -86,7 +86,6 @@ import org.apache.hadoop.hdfs.protocol.L
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
-import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
 import org.apache.hadoop.hdfs.protocol.datatransfer.ReplaceDatanodeOnFailure;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
@@ -97,6 +96,7 @@ import org.apache.hadoop.hdfs.server.blo
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStatistics;
 import org.apache.hadoop.hdfs.server.blockmanagement.UnderReplicatedBlocks;
 import org.apache.hadoop.hdfs.server.common.GenerationStamp;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState;
@@ -207,9 +207,6 @@ public class FSNamesystem implements FSC
   private PermissionStatus defaultPermission;
   // FSNamesystemMetrics counter variables
   @Metric private MutableCounterInt expiredHeartbeats;
-  private long capacityTotal = 0L, capacityUsed = 0L, capacityRemaining = 0L;
-  private long blockPoolUsed = 0L;
-  private int totalLoad = 0;
   
   // Scan interval is not configurable.
   private static final long DELEGATION_TOKEN_REMOVER_SCAN_INTERVAL =
@@ -221,24 +218,17 @@ public class FSNamesystem implements FSC
   //
   public FSDirectory dir;
   private BlockManager blockManager;
-  
+  private DatanodeStatistics datanodeStatistics;
+
   // Block pool ID used by this namenode
   String blockPoolId;
 
-  /**
-   * Stores a subset of datanodeMap, containing nodes that are considered alive.
-   * The HeartbeatMonitor periodically checks for out-dated entries,
-   * and removes them from the list.
-   */
-  public ArrayList<DatanodeDescriptor> heartbeats = new ArrayList<DatanodeDescriptor>();
-
   public LeaseManager leaseManager = new LeaseManager(this); 
 
   //
   // Threaded object that checks to see if we have been
   // getting heartbeats from all clients. 
   //
-  Daemon hbthread = null;   // HeartbeatMonitor thread
   public Daemon lmthread = null;   // LeaseMonitor thread
   Daemon smmthread = null;  // SafeModeMonitor thread
   
@@ -248,9 +238,6 @@ public class FSNamesystem implements FSC
   private volatile boolean fsRunning = true;
   long systemStart = 0;
 
-  // heartbeatRecheckInterval is how often namenode checks for expired datanodes
-  private long heartbeatRecheckInterval;
-
   //resourceRecheckInterval is how often namenode checks for the disk space availability
   private long resourceRecheckInterval;
 
@@ -303,6 +290,7 @@ public class FSNamesystem implements FSC
     checkAvailableResources();
     this.systemStart = now();
     this.blockManager = new BlockManager(this, conf);
+    this.datanodeStatistics = blockManager.getDatanodeManager().getDatanodeStatistics();
     this.fsLock = new ReentrantReadWriteLock(true); // fair locking
     setConfigurationParameters(conf);
     dtSecretManager = createDelegationTokenSecretManager(conf);
@@ -333,10 +321,7 @@ public class FSNamesystem implements FSC
   void activate(Configuration conf) throws IOException {
     setBlockTotal();
     blockManager.activate(conf);
-    this.hbthread = new Daemon(new HeartbeatMonitor());
     this.lmthread = new Daemon(leaseManager.new Monitor());
-    
-    hbthread.start();
     lmthread.start();
 
     this.nnrmthread = new Daemon(new NameNodeResourceMonitor());
@@ -463,10 +448,6 @@ public class FSNamesystem implements FSC
                                               DFSConfigKeys.DFS_NAMENODE_UPGRADE_PERMISSION_DEFAULT);
     this.defaultPermission = PermissionStatus.createImmutable(
         fsOwner.getShortUserName(), supergroup, new FsPermission(filePermission));
-
-    this.heartbeatRecheckInterval = conf.getInt(
-        DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 
-        DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5 minutes
     
     this.serverDefaults = new FsServerDefaults(
         conf.getLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE),
@@ -512,7 +493,6 @@ public class FSNamesystem implements FSC
     fsRunning = false;
     try {
       if (blockManager != null) blockManager.close();
-      if (hbthread != null) hbthread.interrupt();
       if (smmthread != null) smmthread.interrupt();
       if (dtSecretManager != null) dtSecretManager.stopThreads();
       if (nnrmthread != null) nnrmthread.interrupt();
@@ -622,7 +602,7 @@ public class FSNamesystem implements FSC
    * Set permissions for an existing file.
    * @throws IOException
    */
-  public void setPermission(String src, FsPermission permission)
+  void setPermission(String src, FsPermission permission)
       throws AccessControlException, FileNotFoundException, SafeModeException,
       UnresolvedLinkException, IOException {
     HdfsFileStatus resultingStat = null;
@@ -651,7 +631,7 @@ public class FSNamesystem implements FSC
    * Set owner for an existing file.
    * @throws IOException
    */
-  public void setOwner(String src, String username, String group)
+  void setOwner(String src, String username, String group)
       throws AccessControlException, FileNotFoundException, SafeModeException,
       UnresolvedLinkException, IOException {
     HdfsFileStatus resultingStat = null;
@@ -818,12 +798,6 @@ public class FSNamesystem implements FSC
           lastBlock, last.isComplete());
     }
   }
-
-  /** Create a LocatedBlock. */
-  public LocatedBlock createLocatedBlock(final Block b, final DatanodeInfo[] locations,
-      final long offset, final boolean corrupt) throws IOException {
-    return new LocatedBlock(getExtendedBlock(b), locations, offset, corrupt);
-  }
   
 
   /**
@@ -1018,7 +992,7 @@ public class FSNamesystem implements FSC
   /**
    * Create a symbolic link.
    */
-  public void createSymlink(String target, String link,
+  void createSymlink(String target, String link,
       PermissionStatus dirPerms, boolean createParent) 
       throws IOException, UnresolvedLinkException {
     HdfsFileStatus resultingStat = null;
@@ -1988,7 +1962,7 @@ public class FSNamesystem implements FSC
    * @see ClientProtocol#delete(String, boolean) for detailed descriptoin and 
    * description of exceptions
    */
-    public boolean delete(String src, boolean recursive)
+    boolean delete(String src, boolean recursive)
         throws AccessControlException, SafeModeException,
                UnresolvedLinkException, IOException {
       if (NameNode.stateChangeLog.isDebugEnabled()) {
@@ -2118,7 +2092,7 @@ public class FSNamesystem implements FSC
   /**
    * Create all the necessary directories
    */
-  public boolean mkdirs(String src, PermissionStatus permissions,
+  boolean mkdirs(String src, PermissionStatus permissions,
       boolean createParent) throws IOException, UnresolvedLinkException {
     boolean status = false;
     if(NameNode.stateChangeLog.isDebugEnabled()) {
@@ -2536,7 +2510,7 @@ public class FSNamesystem implements FSC
    * @throws UnresolvedLinkException if symbolic link is encountered
    * @throws IOException if other I/O error occurred
    */
-  public DirectoryListing getListing(String src, byte[] startAfter,
+  DirectoryListing getListing(String src, byte[] startAfter,
       boolean needLocation) 
     throws AccessControlException, UnresolvedLinkException, IOException {
     DirectoryListing dl;
@@ -2606,7 +2580,7 @@ public class FSNamesystem implements FSC
    * @see #registerDatanode(DatanodeRegistration)
    * @return registration ID
    */
-  public String getRegistrationID() {
+  String getRegistrationID() {
     return Storage.getRegistrationID(dir.fsImage.getStorage());
   }
 
@@ -2627,7 +2601,8 @@ public class FSNamesystem implements FSC
         throws IOException {
     readLock();
     try {
-      final int maxTransfer = blockManager.maxReplicationStreams - xmitsInProgress;
+      final int maxTransfer = blockManager.getMaxReplicationStreams()
+          - xmitsInProgress;
       DatanodeCommand[] cmds = blockManager.getDatanodeManager().handleHeartbeat(
           nodeReg, blockPoolId, capacity, dfsUsed, remaining, blockPoolUsed,
           xceiverCount, maxTransfer, failedVolumes);
@@ -2655,35 +2630,6 @@ public class FSNamesystem implements FSC
     }
   }
 
-  public void updateStats(DatanodeDescriptor node, boolean isAdded) {
-    //
-    // The statistics are protected by the heartbeat lock
-    // For decommissioning/decommissioned nodes, only used capacity
-    // is counted.
-    //
-    assert(Thread.holdsLock(heartbeats));
-    if (isAdded) {
-      capacityUsed += node.getDfsUsed();
-      blockPoolUsed += node.getBlockPoolUsed();
-      totalLoad += node.getXceiverCount();
-      if (!(node.isDecommissionInProgress() || node.isDecommissioned())) {
-        capacityTotal += node.getCapacity();
-        capacityRemaining += node.getRemaining();
-      } else {
-        capacityTotal += node.getDfsUsed();
-      }
-    } else {
-      capacityUsed -= node.getDfsUsed();
-      blockPoolUsed -= node.getBlockPoolUsed();
-      totalLoad -= node.getXceiverCount();
-      if (!(node.isDecommissionInProgress() || node.isDecommissioned())) {
-        capacityTotal -= node.getCapacity();
-        capacityRemaining -= node.getRemaining();
-      } else {
-        capacityTotal -= node.getDfsUsed();
-      }
-    }
-  }
 
   /**
    * Returns whether or not there were available resources at the last check of
@@ -2735,86 +2681,7 @@ public class FSNamesystem implements FSC
       }
     }
   }
-
-
-  /**
-   * Periodically calls heartbeatCheck() and updateBlockKey()
-   */
-  class HeartbeatMonitor implements Runnable {
-    private long lastHeartbeatCheck;
-    private long lastBlockKeyUpdate;
-    /**
-     */
-    public void run() {
-      while (fsRunning) {
-        try {
-          long now = now();
-          if (lastHeartbeatCheck + heartbeatRecheckInterval < now) {
-            heartbeatCheck();
-            lastHeartbeatCheck = now;
-          }
-          if (blockManager.isBlockTokenEnabled()
-              && (lastBlockKeyUpdate + blockManager.getBlockKeyUpdateInterval() < now)) {
-            blockManager.updateBlockKey();
-            lastBlockKeyUpdate = now;
-          }
-        } catch (Exception e) {
-          FSNamesystem.LOG.error("Exception while checking heartbeat", e);
-        }
-        try {
-          Thread.sleep(5000);  // 5 seconds
-        } catch (InterruptedException ie) {
-        }
-      }
-    }
-  }
-
- 
-  public void setNodeReplicationLimit(int limit) {
-    blockManager.maxReplicationStreams = limit;
-  }
-
-  /**
-   * Remove a datanode descriptor.
-   * @param nodeID datanode ID.
-   * @throws UnregisteredNodeException 
-   */
-  public void removeDatanode(final DatanodeID nodeID
-      ) throws UnregisteredNodeException {
-    writeLock();
-    try {
-      DatanodeDescriptor nodeInfo = getBlockManager().getDatanodeManager(
-          ).getDatanode(nodeID);
-      if (nodeInfo != null) {
-        removeDatanode(nodeInfo);
-      } else {
-        NameNode.stateChangeLog.warn("BLOCK* NameSystem.removeDatanode: "
-                                     + nodeID.getName() + " does not exist");
-      }
-    } finally {
-      writeUnlock();
-    }
-  }
   
-  /**
-   * Remove a datanode descriptor.
-   * @param nodeInfo datanode descriptor.
-   */
-  public void removeDatanode(DatanodeDescriptor nodeInfo) {
-    assert hasWriteLock();
-    synchronized (heartbeats) {
-      if (nodeInfo.isAlive) {
-        updateStats(nodeInfo, false);
-        heartbeats.remove(nodeInfo);
-        nodeInfo.isAlive = false;
-      }
-    }
-
-    blockManager.removeDatanode(nodeInfo);
-
-    checkSafeMode();
-  }
-
   FSImage getFSImage() {
     return dir.fsImage;
   }
@@ -2822,61 +2689,12 @@ public class FSNamesystem implements FSC
   FSEditLog getEditLog() {
     return getFSImage().getEditLog();
   }
-
-  /**
-   * Check if there are any expired heartbeats, and if so,
-   * whether any blocks have to be re-replicated.
-   * While removing dead datanodes, make sure that only one datanode is marked
-   * dead at a time within the synchronized section. Otherwise, a cascading
-   * effect causes more datanodes to be declared dead.
-   */
-  void heartbeatCheck() {
-    final DatanodeManager datanodeManager = getBlockManager().getDatanodeManager();
-    // It's OK to check safe mode w/o taking the lock here, we re-check
-    // for safe mode after taking the lock before removing a datanode.
-    if (isInSafeMode()) {
-      return;
-    }
-    boolean allAlive = false;
-    while (!allAlive) {
-      boolean foundDead = false;
-      DatanodeID nodeID = null;
-
-      // locate the first dead node.
-      synchronized(heartbeats) {
-        for (Iterator<DatanodeDescriptor> it = heartbeats.iterator();
-             it.hasNext();) {
-          DatanodeDescriptor nodeInfo = it.next();
-          if (datanodeManager.isDatanodeDead(nodeInfo)) {
-            expiredHeartbeats.incr();
-            foundDead = true;
-            nodeID = nodeInfo;
-            break;
-          }
-        }
-      }
-
-      // acquire the fsnamesystem lock, and then remove the dead node.
-      if (foundDead) {
-        writeLock();
-        if (isInSafeMode()) {
-          return;
-        }
-        try {
-          datanodeManager.removeDeadDatanode(nodeID);
-        } finally {
-          writeUnlock();
-        }
-      }
-      allAlive = !foundDead;
-    }
-  }
     
   /**
    * The given node is reporting all its blocks.  Use this info to 
    * update the (machine-->blocklist) and (block-->machinelist) tables.
    */
-  public void processReport(DatanodeID nodeID, String poolId,
+  void processReport(DatanodeID nodeID, String poolId,
       BlockListAsLongs newReport) throws IOException {
     long startTime, endTime;
 
@@ -3057,15 +2875,18 @@ public class FSNamesystem implements FSC
     return blockManager.getMissingBlocksCount();
   }
   
+  /** Increment expired heartbeat counter. */
+  public void incrExpiredHeartbeats() {
+    expiredHeartbeats.incr();
+  }
+
+  /** @see ClientProtocol#getStats() */
   long[] getStats() {
-    synchronized(heartbeats) {
-      return new long[] {this.capacityTotal, this.capacityUsed, 
-                         this.capacityRemaining,
-                         getUnderReplicatedBlocks(),
-                         getCorruptReplicaBlocks(),
-                         getMissingBlocksCount(),
-                         getBlockPoolUsedSpace()};
-    }
+    final long[] stats = datanodeStatistics.getStats();
+    stats[ClientProtocol.GET_STATS_UNDER_REPLICATED_IDX] = getUnderReplicatedBlocks();
+    stats[ClientProtocol.GET_STATS_CORRUPT_BLOCKS_IDX] = getCorruptReplicaBlocks();
+    stats[ClientProtocol.GET_STATS_MISSING_BLOCKS_IDX] = getMissingBlocksCount();
+    return stats;
   }
 
   /**
@@ -3073,9 +2894,7 @@ public class FSNamesystem implements FSC
    */
   @Override // FSNamesystemMBean
   public long getCapacityTotal() {
-    synchronized(heartbeats) {
-      return capacityTotal;
-    }
+    return datanodeStatistics.getCapacityTotal();
   }
 
   @Metric
@@ -3088,9 +2907,7 @@ public class FSNamesystem implements FSC
    */
   @Override // FSNamesystemMBean
   public long getCapacityUsed() {
-    synchronized(heartbeats) {
-      return capacityUsed;
-    }
+    return datanodeStatistics.getCapacityUsed();
   }
 
   @Metric
@@ -3098,32 +2915,9 @@ public class FSNamesystem implements FSC
     return DFSUtil.roundBytesToGB(getCapacityUsed());
   }
 
-  /**
-   * Total used space by data nodes as percentage of total capacity
-   */
-  public float getCapacityUsedPercent() {
-    synchronized(heartbeats){
-      return DFSUtil.getPercentUsed(capacityUsed, capacityTotal);
-    }
-  }
-  /**
-   * Total used space by data nodes for non DFS purposes such
-   * as storing temporary files on the local file system
-   */
-  public long getCapacityUsedNonDFS() {
-    long nonDFSUsed = 0;
-    synchronized(heartbeats){
-      nonDFSUsed = capacityTotal - capacityRemaining - capacityUsed;
-    }
-    return nonDFSUsed < 0 ? 0 : nonDFSUsed;
-  }
-  /**
-   * Total non-used raw bytes.
-   */
+  @Override
   public long getCapacityRemaining() {
-    synchronized(heartbeats) {
-      return capacityRemaining;
-    }
+    return datanodeStatistics.getCapacityRemaining();
   }
 
   @Metric
@@ -3132,22 +2926,12 @@ public class FSNamesystem implements FSC
   }
 
   /**
-   * Total remaining space by data nodes as percentage of total capacity
-   */
-  public float getCapacityRemainingPercent() {
-    synchronized(heartbeats){
-      return DFSUtil.getPercentRemaining(capacityRemaining, capacityTotal);
-    }
-  }
-  /**
    * Total number of connections.
    */
   @Override // FSNamesystemMBean
   @Metric
   public int getTotalLoad() {
-    synchronized (heartbeats) {
-      return this.totalLoad;
-    }
+    return datanodeStatistics.getXceiverCount();
   }
 
   int getNumberOfDatanodes(DatanodeReportType type) {
@@ -3757,8 +3541,9 @@ public class FSNamesystem implements FSC
     }
     return isInSafeMode();
   }
-  
-  private void checkSafeMode() {
+
+  /** Check and trigger safe mode. */
+  public void checkSafeMode() {
     // safeMode is volatile, and may be set to null at any time
     SafeModeInfo safeMode = this.safeMode;
     if (safeMode != null) {
@@ -4131,11 +3916,6 @@ public class FSNamesystem implements FSC
     return blockManager.getUnderReplicatedBlocksCount();
   }
 
-  /** Return number of under-replicated but not missing blocks */
-  public long getUnderReplicatedNotMissingBlocks() {
-    return blockManager.getUnderReplicatedNotMissingBlocks();
-  }
-
   /** Returns number of blocks with corrupt replicas */
   @Metric({"CorruptBlocks", "Number of blocks with corrupt replicas"})
   public long getCorruptReplicaBlocks() {
@@ -4207,14 +3987,14 @@ public class FSNamesystem implements FSC
   /**
    * Sets the generation stamp for this filesystem
    */
-  public void setGenerationStamp(long stamp) {
+  void setGenerationStamp(long stamp) {
     generationStamp.setStamp(stamp);
   }
 
   /**
    * Gets the generation stamp for this filesystem
    */
-  public long getGenerationStamp() {
+  long getGenerationStamp() {
     return generationStamp.getStamp();
   }
 
@@ -4854,31 +4634,27 @@ public class FSNamesystem implements FSC
 
   @Override // NameNodeMXBean
   public long getNonDfsUsedSpace() {
-    return getCapacityUsedNonDFS();
+    return datanodeStatistics.getCapacityUsedNonDFS();
   }
 
   @Override // NameNodeMXBean
   public float getPercentUsed() {
-    return getCapacityUsedPercent();
+    return datanodeStatistics.getCapacityUsedPercent();
   }
 
   @Override // NameNodeMXBean
   public long getBlockPoolUsedSpace() {
-    synchronized(heartbeats) {
-      return blockPoolUsed;
-    }
+    return datanodeStatistics.getBlockPoolUsed();
   }
 
   @Override // NameNodeMXBean
   public float getPercentBlockPoolUsed() {
-    synchronized(heartbeats) {
-      return DFSUtil.getPercentUsed(blockPoolUsed, capacityTotal);
-    }
+    return datanodeStatistics.getPercentBlockPoolUsed();
   }
 
   @Override // NameNodeMXBean
   public float getPercentRemaining() {
-    return getCapacityRemainingPercent();
+    return datanodeStatistics.getCapacityRemainingPercent();
   }
 
   @Override // NameNodeMXBean

Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=1154042&r1=1154041&r2=1154042&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java Thu Aug  4 22:55:48 2011
@@ -1229,7 +1229,7 @@ public class NameNode implements Namenod
       LOG.warn("Disk error on " + dnName + ": " + msg);
     } else if (errorCode == DatanodeProtocol.FATAL_DISK_ERROR) {
       LOG.warn("Fatal disk error on " + dnName + ": " + msg);
-      namesystem.removeDatanode(nodeReg);            
+      namesystem.getBlockManager().getDatanodeManager().removeDatanode(nodeReg);            
     } else {
       LOG.info("Error report from " + dnName + ": " + msg);
     }

Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java?rev=1154042&r1=1154041&r2=1154042&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java Thu Aug  4 22:55:48 2011
@@ -349,7 +349,7 @@ class NamenodeJspHelper {
           + colTxt() + ":" + colTxt() + decommissioning.size() 
           + rowTxt() + colTxt("Excludes missing blocks.")
           + "Number of Under-Replicated Blocks" + colTxt() + ":" + colTxt()
-          + fsn.getUnderReplicatedNotMissingBlocks()
+          + fsn.getBlockManager().getUnderReplicatedNotMissingBlocks()
           + "</table></div><br>\n");
 
       if (live.isEmpty() && dead.isEmpty()) {

Modified: hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestMissingBlocksAlert.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestMissingBlocksAlert.java?rev=1154042&r1=1154041&r2=1154042&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestMissingBlocksAlert.java (original)
+++ hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestMissingBlocksAlert.java Thu Aug  4 22:55:48 2011
@@ -29,6 +29,7 @@ import org.apache.hadoop.fs.ChecksumExce
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 
 /**
  * The test makes sure that NameNode detects presense blocks that do not have
@@ -56,6 +57,7 @@ public class TestMissingBlocksAlert exte
       cluster = new MiniDFSCluster.Builder(conf).build();
       cluster.waitActive();
 
+      final BlockManager bm = cluster.getNamesystem().getBlockManager();
       DistributedFileSystem dfs = 
                             (DistributedFileSystem) cluster.getFileSystem();
 
@@ -86,8 +88,7 @@ public class TestMissingBlocksAlert exte
       }
       assertTrue(dfs.getMissingBlocksCount() == 1);
       assertEquals(4, dfs.getUnderReplicatedBlocksCount());
-      assertEquals(3, 
-          cluster.getNamesystem().getUnderReplicatedNotMissingBlocks());
+      assertEquals(3, bm.getUnderReplicatedNotMissingBlocks());
 
 
       // Now verify that it shows up on webui
@@ -109,8 +110,7 @@ public class TestMissingBlocksAlert exte
       }
 
       assertEquals(2, dfs.getUnderReplicatedBlocksCount());
-      assertEquals(2, 
-          cluster.getNamesystem().getUnderReplicatedNotMissingBlocks());
+      assertEquals(2, bm.getUnderReplicatedNotMissingBlocks());
 
       // and make sure WARNING disappears
       // Now verify that it shows up on webui

Modified: hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java?rev=1154042&r1=1154041&r2=1154042&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java (original)
+++ hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java Thu Aug  4 22:55:48 2011
@@ -25,10 +25,13 @@ import java.util.Set;
 
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
-import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.util.Daemon;
 
 public class BlockManagerTestUtil {
+  public static void setNodeReplicationLimit(final BlockManager blockManager,
+      final int limit) {
+    blockManager.maxReplicationStreams = limit;
+  }
 
   /** @return the datanode descriptor for the given the given storageID. */
   public static DatanodeDescriptor getDatanode(final FSNamesystem ns,

Copied: hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestBlocksWithNotEnoughRacks.java (from r1154040, hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlocksWithNotEnoughRacks.java)
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestBlocksWithNotEnoughRacks.java?p2=hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestBlocksWithNotEnoughRacks.java&p1=hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlocksWithNotEnoughRacks.java&r1=1154040&r2=1154042&rev=1154042&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlocksWithNotEnoughRacks.java (original)
+++ hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestBlocksWithNotEnoughRacks.java Thu Aug  4 22:55:48 2011
@@ -16,10 +16,16 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.hdfs.server.namenode;
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 import java.util.ArrayList;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FileSystem;
@@ -31,18 +37,14 @@ import org.apache.hadoop.hdfs.MiniDFSClu
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.log4j.Level;
-
-import static org.junit.Assert.*;
 import org.junit.Test;
 
 public class TestBlocksWithNotEnoughRacks {
   public static final Log LOG = LogFactory.getLog(TestBlocksWithNotEnoughRacks.class);
   static {
-    ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)LogFactory.getLog(FSNamesystem.class)).getLogger().setLevel(Level.ALL);
     ((Log4JLogger)LOG).getLogger().setLevel(Level.ALL);
   }
 
@@ -278,6 +280,7 @@ public class TestBlocksWithNotEnoughRack
     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
       .numDataNodes(racks.length).racks(racks).build();
     final FSNamesystem ns = cluster.getNameNode().getNamesystem();
+    final DatanodeManager dm = ns.getBlockManager().getDatanodeManager();
 
     try {
       // Create a file with one block with a replication factor of 2
@@ -293,7 +296,7 @@ public class TestBlocksWithNotEnoughRack
       DataNode dataNode = datanodes.get(idx);
       DatanodeID dnId = dataNode.getDatanodeId();
       cluster.stopDataNode(idx);
-      ns.removeDatanode(dnId);
+      dm.removeDatanode(dnId);
 
       // The block should still have sufficient # replicas, across racks.
       // The last node may not have contained a replica, but if it did
@@ -307,7 +310,7 @@ public class TestBlocksWithNotEnoughRack
       dataNode = datanodes.get(idx);
       dnId = dataNode.getDatanodeId();
       cluster.stopDataNode(idx);
-      ns.removeDatanode(dnId);
+      dm.removeDatanode(dnId);
 
       // Make sure we have enough live replicas even though we are
       // short one rack and therefore need one replica
@@ -332,6 +335,7 @@ public class TestBlocksWithNotEnoughRack
     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
       .numDataNodes(racks.length).racks(racks).build();
     final FSNamesystem ns = cluster.getNameNode().getNamesystem();
+    final DatanodeManager dm = ns.getBlockManager().getDatanodeManager();
 
     try {
       // Create a file with one block
@@ -347,7 +351,7 @@ public class TestBlocksWithNotEnoughRack
       DataNode dataNode = datanodes.get(2);
       DatanodeID dnId = dataNode.getDatanodeId();
       cluster.stopDataNode(2);
-      ns.removeDatanode(dnId);
+      dm.removeDatanode(dnId);
 
       // The block gets re-replicated to another datanode so it has a 
       // sufficient # replicas, but not across racks, so there should

Modified: hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestComputeInvalidateWork.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestComputeInvalidateWork.java?rev=1154042&r1=1154041&r2=1154042&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestComputeInvalidateWork.java (original)
+++ hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestComputeInvalidateWork.java Thu Aug  4 22:55:48 2011
@@ -45,8 +45,8 @@ public class TestComputeInvalidateWork e
       final FSNamesystem namesystem = cluster.getNamesystem();
       final BlockManager bm = namesystem.getBlockManager();
       final int blockInvalidateLimit = bm.getDatanodeManager().blockInvalidateLimit;
-      DatanodeDescriptor[] nodes =
-        namesystem.heartbeats.toArray(new DatanodeDescriptor[NUM_OF_DATANODES]);
+      final DatanodeDescriptor[] nodes = bm.getDatanodeManager(
+          ).getHeartbeatManager().getDatanodes();
       assertEquals(nodes.length, NUM_OF_DATANODES);
       
       namesystem.writeLock();

Modified: hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java?rev=1154042&r1=1154041&r2=1154042&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java (original)
+++ hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java Thu Aug  4 22:55:48 2011
@@ -52,6 +52,8 @@ public class TestHeartbeatHandling exten
     try {
       cluster.waitActive();
       final FSNamesystem namesystem = cluster.getNamesystem();
+      final HeartbeatManager hm = namesystem.getBlockManager(
+          ).getDatanodeManager().getHeartbeatManager();
       final String poolId = namesystem.getBlockPoolId();
       final DatanodeRegistration nodeReg = 
         DataNodeTestUtils.getDNRegistrationForBP(cluster.getDataNodes().get(0), poolId);
@@ -69,7 +71,7 @@ public class TestHeartbeatHandling exten
 
       try {
         namesystem.writeLock();
-        synchronized (namesystem.heartbeats) {
+        synchronized(hm) {
           for (int i=0; i<MAX_REPLICATE_BLOCKS; i++) {
             dd.addBlockToBeReplicated(
                 new Block(i, 0, GenerationStamp.FIRST_VALID_STAMP), ONE_TARGET);

Copied: hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestNodeCount.java (from r1154040, hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestNodeCount.java)
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestNodeCount.java?p2=hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestNodeCount.java&p1=hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestNodeCount.java&r1=1154040&r2=1154042&rev=1154042&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestNodeCount.java (original)
+++ hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestNodeCount.java Thu Aug  4 22:55:48 2011
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hdfs.server.namenode;
+package org.apache.hadoop.hdfs.server.blockmanagement;
 
 import java.util.Collection;
 import java.util.Iterator;
@@ -33,7 +33,9 @@ import org.apache.hadoop.hdfs.MiniDFSClu
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+import org.apache.hadoop.hdfs.server.blockmanagement.HeartbeatManager;
 import org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 
 /**
  * Test if live nodes count per node is correct 
@@ -57,6 +59,8 @@ public class TestNodeCount extends TestC
       new MiniDFSCluster.Builder(conf).numDataNodes(REPLICATION_FACTOR).build();
     try {
       final FSNamesystem namesystem = cluster.getNamesystem();
+      final BlockManager bm = namesystem.getBlockManager();
+      final HeartbeatManager hm = bm.getDatanodeManager().getHeartbeatManager();
       final FileSystem fs = cluster.getFileSystem();
       
       // populate the cluster with a one block file
@@ -66,8 +70,7 @@ public class TestNodeCount extends TestC
       ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, FILE_PATH);
 
       // keep a copy of all datanode descriptor
-      DatanodeDescriptor[] datanodes = 
-         namesystem.heartbeats.toArray(new DatanodeDescriptor[REPLICATION_FACTOR]);
+      final DatanodeDescriptor[] datanodes = hm.getDatanodes();
       
       // start two new nodes
       cluster.startDataNodes(conf, 2, true, null, null);
@@ -80,9 +83,9 @@ public class TestNodeCount extends TestC
       // make sure that NN detects that the datanode is down
       try {
         namesystem.writeLock();
-        synchronized (namesystem.heartbeats) {
+        synchronized (hm) {
           datanode.setLastUpdate(0); // mark it dead
-          namesystem.heartbeatCheck();
+          hm.heartbeatCheck();
         }
       } finally {
         namesystem.writeUnlock();
@@ -102,12 +105,12 @@ public class TestNodeCount extends TestC
       }
       
       // find out a non-excess node
-      Iterator<DatanodeDescriptor> iter = namesystem.getBlockManager().blocksMap
+      final Iterator<DatanodeDescriptor> iter = bm.blocksMap
           .nodeIterator(block.getLocalBlock());
       DatanodeDescriptor nonExcessDN = null;
       while (iter.hasNext()) {
         DatanodeDescriptor dn = iter.next();
-        Collection<Block> blocks = namesystem.getBlockManager().excessReplicateMap.get(dn.getStorageID());
+        Collection<Block> blocks = bm.excessReplicateMap.get(dn.getStorageID());
         if (blocks == null || !blocks.contains(block) ) {
           nonExcessDN = dn;
           break;
@@ -121,9 +124,9 @@ public class TestNodeCount extends TestC
       
       try {
         namesystem.writeLock();
-        synchronized (namesystem.heartbeats) {
+        synchronized(hm) {
           nonExcessDN.setLastUpdate(0); // mark it dead
-          namesystem.heartbeatCheck();
+          hm.heartbeatCheck();
         }
       } finally {
         namesystem.writeUnlock();

Copied: hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java (from r1154040, hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestOverReplicatedBlocks.java)
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java?p2=hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java&p1=hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestOverReplicatedBlocks.java&r1=1154040&r2=1154042&rev=1154042&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestOverReplicatedBlocks.java (original)
+++ hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java Thu Aug  4 22:55:48 2011
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hdfs.server.namenode;
+package org.apache.hadoop.hdfs.server.blockmanagement;
 
 import java.io.File;
 import java.io.IOException;
@@ -34,7 +34,9 @@ import org.apache.hadoop.hdfs.TestDatano
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+import org.apache.hadoop.hdfs.server.blockmanagement.HeartbeatManager;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 
 public class TestOverReplicatedBlocks extends TestCase {
   /** Test processOverReplicatedBlock can handle corrupt replicas fine.
@@ -83,13 +85,15 @@ public class TestOverReplicatedBlocks ex
             cluster.getDataNodes().get(2), blockPoolId);
          
       final FSNamesystem namesystem = cluster.getNamesystem();
+      final BlockManager bm = namesystem.getBlockManager();
+      final HeartbeatManager hm = bm.getDatanodeManager().getHeartbeatManager();
       try {
         namesystem.writeLock();
-        synchronized (namesystem.heartbeats) {
+        synchronized(hm) {
           // set live datanode's remaining space to be 0 
           // so they will be chosen to be deleted when over-replication occurs
           String corruptMachineName = corruptDataNode.getName();
-          for (DatanodeDescriptor datanode : namesystem.heartbeats) {
+          for (DatanodeDescriptor datanode : hm.getDatanodes()) {
             if (!corruptMachineName.equals(datanode.getName())) {
               datanode.updateHeartbeat(100L, 100L, 0L, 100L, 0, 0);
             }
@@ -100,8 +104,7 @@ public class TestOverReplicatedBlocks ex
 
           // corrupt one won't be chosen to be excess one
           // without 4910 the number of live replicas would be 0: block gets lost
-          assertEquals(1, namesystem.getBlockManager().countNodes(block.getLocalBlock())
-              .liveReplicas());
+          assertEquals(1, bm.countNodes(block.getLocalBlock()).liveReplicas());
         }
       } finally {
         namesystem.writeUnlock();

Modified: hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java?rev=1154042&r1=1154041&r2=1154042&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java (original)
+++ hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java Thu Aug  4 22:55:48 2011
@@ -1128,7 +1128,8 @@ public class NNThroughputBenchmark {
       // decommission data-nodes
       decommissionNodes();
       // set node replication limit
-      namesystem.setNodeReplicationLimit(nodeReplicationLimit);
+      BlockManagerTestUtil.setNodeReplicationLimit(namesystem.getBlockManager(),
+          nodeReplicationLimit);
     }
 
     private void decommissionNodes() throws IOException {
@@ -1171,9 +1172,7 @@ public class NNThroughputBenchmark {
     void printResults() {
       String blockDistribution = "";
       String delim = "(";
-      int totalReplicas = 0;
       for(int idx=0; idx < blockReportObject.getNumDatanodes(); idx++) {
-        totalReplicas += blockReportObject.datanodes[idx].nrBlocks;
         blockDistribution += delim + blockReportObject.datanodes[idx].nrBlocks;
         delim = ", ";
       }

Modified: hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestNamenodeCapacityReport.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestNamenodeCapacityReport.java?rev=1154042&r1=1154041&r2=1154042&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestNamenodeCapacityReport.java (original)
+++ hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestNamenodeCapacityReport.java Thu Aug  4 22:55:48 2011
@@ -112,10 +112,10 @@ public class TestNamenodeCapacityReport 
       
       configCapacity = namesystem.getCapacityTotal();
       used = namesystem.getCapacityUsed();
-      nonDFSUsed = namesystem.getCapacityUsedNonDFS();
+      nonDFSUsed = namesystem.getNonDfsUsedSpace();
       remaining = namesystem.getCapacityRemaining();
-      percentUsed = namesystem.getCapacityUsedPercent();
-      percentRemaining = namesystem.getCapacityRemainingPercent();
+      percentUsed = namesystem.getPercentUsed();
+      percentRemaining = namesystem.getPercentRemaining();
       bpUsed = namesystem.getBlockPoolUsedSpace();
       percentBpUsed = namesystem.getPercentBlockPoolUsed();