You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by da...@apache.org on 2014/07/18 20:04:50 UTC

svn commit: r1611739 - in /hadoop/common/branches/branch-2.5/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/protocol/ src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ src/main/java/org/apache/hadoop/hdfs/server/nam...

Author: daryn
Date: Fri Jul 18 18:04:49 2014
New Revision: 1611739

URL: http://svn.apache.org/r1611739
Log:
svn merge -c 1611737 FIXES: HDFS-6599. 2.4 addBlock is 10 to 20 times slower compared to 0.23 (daryn)

Modified:
    hadoop/common/branches/branch-2.5/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/branches/branch-2.5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java
    hadoop/common/branches/branch-2.5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
    hadoop/common/branches/branch-2.5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
    hadoop/common/branches/branch-2.5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java
    hadoop/common/branches/branch-2.5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
    hadoop/common/branches/branch-2.5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java
    hadoop/common/branches/branch-2.5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/common/branches/branch-2.5/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyConsiderLoad.java
    hadoop/common/branches/branch-2.5/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeCapacityReport.java

Modified: hadoop/common/branches/branch-2.5/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.5/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1611739&r1=1611738&r2=1611739&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.5/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2.5/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Fri Jul 18 18:04:49 2014
@@ -254,6 +254,8 @@ Release 2.5.0 - UNRELEASED
 
     HDFS-6583. Remove clientNode in FileUnderConstructionFeature. (wheat9)
 
+    HDFS-6599. 2.4 addBlock is 10 to 20 times slower compared to 0.23 (daryn)
+
   BUG FIXES 
 
     HDFS-6112. NFS Gateway docs are incorrect for allowed hosts configuration.

Modified: hadoop/common/branches/branch-2.5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java?rev=1611739&r1=1611738&r2=1611739&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java (original)
+++ hadoop/common/branches/branch-2.5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java Fri Jul 18 18:04:49 2014
@@ -336,7 +336,7 @@ public class DatanodeInfo extends Datano
     buffer.append("Cache Remaining: " +cr+ " ("+StringUtils.byteDesc(cr)+")"+"\n");
     buffer.append("Cache Used%: "+percent2String(cacheUsedPercent) + "\n");
     buffer.append("Cache Remaining%: "+percent2String(cacheRemainingPercent) + "\n");
-
+    buffer.append("Xceivers: "+getXceiverCount()+"\n");
     buffer.append("Last contact: "+new Date(lastUpdate)+"\n");
     return buffer.toString();
   }

Modified: hadoop/common/branches/branch-2.5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java?rev=1611739&r1=1611738&r2=1611739&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java (original)
+++ hadoop/common/branches/branch-2.5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java Fri Jul 18 18:04:49 2014
@@ -636,15 +636,11 @@ public class BlockPlacementPolicyDefault
 
     // check the communication traffic of the target machine
     if (considerLoad) {
-      double avgLoad = 0;
-      if (stats != null) {
-        int size = stats.getNumDatanodesInService();
-        if (size != 0) {
-          avgLoad = (double)stats.getTotalLoad()/size;
-        }
-      }
-      if (node.getXceiverCount() > (2.0 * avgLoad)) {
-        logNodeIsNotChosen(storage, "the node is too busy ");
+      final double maxLoad = 2.0 * stats.getInServiceXceiverAverage();
+      final int nodeLoad = node.getXceiverCount();
+      if (nodeLoad > maxLoad) {
+        logNodeIsNotChosen(storage,
+            "the node is too busy (load:"+nodeLoad+" > "+maxLoad+") ");
         return false;
       }
     }

Modified: hadoop/common/branches/branch-2.5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java?rev=1611739&r1=1611738&r2=1611739&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java (original)
+++ hadoop/common/branches/branch-2.5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java Fri Jul 18 18:04:49 2014
@@ -819,7 +819,9 @@ public class DatanodeManager {
   }
 
   /** Start decommissioning the specified datanode. */
-  private void startDecommission(DatanodeDescriptor node) {
+  @InterfaceAudience.Private
+  @VisibleForTesting
+  public void startDecommission(DatanodeDescriptor node) {
     if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
       for (DatanodeStorageInfo storage : node.getStorageInfos()) {
         LOG.info("Start Decommissioning " + node + " " + storage

Modified: hadoop/common/branches/branch-2.5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java?rev=1611739&r1=1611738&r2=1611739&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java (original)
+++ hadoop/common/branches/branch-2.5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java Fri Jul 18 18:04:49 2014
@@ -52,6 +52,12 @@ public interface DatanodeStatistics {
   /** @return the xceiver count */
   public int getXceiverCount();
 
+  /** @return average xceiver count for non-decommission(ing|ed) nodes */
+  public int getInServiceXceiverCount();
+  
+  /** @return number of non-decommission(ing|ed) nodes */
+  public int getNumDatanodesInService();
+  
   /**
    * @return the total used space by data nodes for non-DFS purposes
    * such as storing temporary files on the local file system

Modified: hadoop/common/branches/branch-2.5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java?rev=1611739&r1=1611738&r2=1611739&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java (original)
+++ hadoop/common/branches/branch-2.5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java Fri Jul 18 18:04:49 2014
@@ -151,6 +151,16 @@ class HeartbeatManager implements Datano
   }
   
   @Override
+  public synchronized int getInServiceXceiverCount() {
+    return stats.nodesInServiceXceiverCount;
+  }
+  
+  @Override
+  public synchronized int getNumDatanodesInService() {
+    return stats.nodesInService;
+  }
+  
+  @Override
   public synchronized long getCacheCapacity() {
     return stats.cacheCapacity;
   }
@@ -178,7 +188,7 @@ class HeartbeatManager implements Datano
   }
 
   synchronized void register(final DatanodeDescriptor d) {
-    if (!datanodes.contains(d)) {
+    if (!d.isAlive) {
       addDatanode(d);
 
       //update its timestamp
@@ -191,6 +201,8 @@ class HeartbeatManager implements Datano
   }
 
   synchronized void addDatanode(final DatanodeDescriptor d) {
+    // update in-service node count
+    stats.add(d);
     datanodes.add(d);
     d.isAlive = true;
   }
@@ -323,6 +335,9 @@ class HeartbeatManager implements Datano
     private long cacheCapacity = 0L;
     private long cacheUsed = 0L;
 
+    private int nodesInService = 0;
+    private int nodesInServiceXceiverCount = 0;
+
     private int expiredHeartbeats = 0;
 
     private void add(final DatanodeDescriptor node) {
@@ -330,6 +345,8 @@ class HeartbeatManager implements Datano
       blockPoolUsed += node.getBlockPoolUsed();
       xceiverCount += node.getXceiverCount();
       if (!(node.isDecommissionInProgress() || node.isDecommissioned())) {
+        nodesInService++;
+        nodesInServiceXceiverCount += node.getXceiverCount();
         capacityTotal += node.getCapacity();
         capacityRemaining += node.getRemaining();
       } else {
@@ -344,6 +361,8 @@ class HeartbeatManager implements Datano
       blockPoolUsed -= node.getBlockPoolUsed();
       xceiverCount -= node.getXceiverCount();
       if (!(node.isDecommissionInProgress() || node.isDecommissioned())) {
+        nodesInService--;
+        nodesInServiceXceiverCount -= node.getXceiverCount();
         capacityTotal -= node.getCapacity();
         capacityRemaining -= node.getRemaining();
       } else {

Modified: hadoop/common/branches/branch-2.5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java?rev=1611739&r1=1611738&r2=1611739&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java (original)
+++ hadoop/common/branches/branch-2.5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java Fri Jul 18 18:04:49 2014
@@ -48,6 +48,15 @@ public interface FSClusterStats {
    * @return Number of datanodes that are both alive and not decommissioned.
    */
   public int getNumDatanodesInService();
+
+  /**
+   * an indication of the average load of non-decommission(ing|ed) nodes
+   * eligible for block placement
+   * 
+   * @return average of the in service number of block transfers and block
+   *         writes that are currently occurring on the cluster.
+   */
+  public double getInServiceXceiverAverage();
 }
     
     

Modified: hadoop/common/branches/branch-2.5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1611739&r1=1611738&r2=1611739&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/branch-2.5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Fri Jul 18 18:04:49 2014
@@ -7319,7 +7319,18 @@ public class FSNamesystem implements Nam
 
   @Override // FSClusterStats
   public int getNumDatanodesInService() {
-    return getNumLiveDataNodes() - getNumDecomLiveDataNodes();
+    return datanodeStatistics.getNumDatanodesInService();
+  }
+  
+  @Override // for block placement strategy
+  public double getInServiceXceiverAverage() {
+    double avgLoad = 0;
+    final int nodes = getNumDatanodesInService();
+    if (nodes != 0) {
+      final int xceivers = datanodeStatistics.getInServiceXceiverCount();
+      avgLoad = (double)xceivers/nodes;
+    }
+    return avgLoad;
   }
 
   public SnapshotManager getSnapshotManager() {

Modified: hadoop/common/branches/branch-2.5/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyConsiderLoad.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.5/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyConsiderLoad.java?rev=1611739&r1=1611738&r2=1611739&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.5/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyConsiderLoad.java (original)
+++ hadoop/common/branches/branch-2.5/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyConsiderLoad.java Fri Jul 18 18:04:49 2014
@@ -40,6 +40,7 @@ import org.apache.hadoop.hdfs.security.t
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
 import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.test.PathUtils;
@@ -102,6 +103,7 @@ public class TestReplicationPolicyConsid
     }
   }
 
+  private final double EPSILON = 0.0001;
   /**
    * Tests that chooseTarget with considerLoad set to true correctly calculates
    * load with decommissioned nodes.
@@ -110,14 +112,6 @@ public class TestReplicationPolicyConsid
   public void testChooseTargetWithDecomNodes() throws IOException {
     namenode.getNamesystem().writeLock();
     try {
-      // Decommission DNs so BlockPlacementPolicyDefault.isGoodTarget()
-      // returns false
-      for (int i = 0; i < 3; i++) {
-        DatanodeInfo d = dnManager.getDatanodeByXferAddr(
-            dnrList.get(i).getIpAddr(),
-            dnrList.get(i).getXferPort());
-        d.setDecommissioned();
-      }
       String blockPoolId = namenode.getNamesystem().getBlockPoolId();
       dnManager.handleHeartbeat(dnrList.get(3),
           BlockManagerTestUtil.getStorageReportsForDatanode(dataNodes[3]),
@@ -134,6 +128,20 @@ public class TestReplicationPolicyConsid
           blockPoolId, dataNodes[5].getCacheCapacity(),
           dataNodes[5].getCacheRemaining(),
           4, 0, 0);
+      // value in the above heartbeats
+      final int load = 2 + 4 + 4;
+      
+      FSNamesystem fsn = namenode.getNamesystem();
+      assertEquals((double)load/6, fsn.getInServiceXceiverAverage(), EPSILON);
+      
+      // Decommission DNs so BlockPlacementPolicyDefault.isGoodTarget()
+      // returns false
+      for (int i = 0; i < 3; i++) {
+        DatanodeDescriptor d = dnManager.getDatanode(dnrList.get(i));
+        dnManager.startDecommission(d);
+        d.setDecommissioned();
+      }
+      assertEquals((double)load/3, fsn.getInServiceXceiverAverage(), EPSILON);
 
       // Call chooseTarget()
       DatanodeStorageInfo[] targets = namenode.getNamesystem().getBlockManager()

Modified: hadoop/common/branches/branch-2.5/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeCapacityReport.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.5/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeCapacityReport.java?rev=1611739&r1=1611738&r2=1611739&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.5/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeCapacityReport.java (original)
+++ hadoop/common/branches/branch-2.5/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeCapacityReport.java Fri Jul 18 18:04:49 2014
@@ -18,9 +18,11 @@
 package org.apache.hadoop.hdfs.server.namenode;
 
 
-import static org.junit.Assert.assertTrue;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY;
+import static org.junit.Assert.*;
 
 import java.io.File;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -28,12 +30,21 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.DF;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSOutputStream;
 import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.junit.Test;
 
 
@@ -153,4 +164,177 @@ public class TestNamenodeCapacityReport 
       if (cluster != null) {cluster.shutdown();}
     }
   }
+  
+  private static final float EPSILON = 0.0001f;
+  @Test
+  public void testXceiverCount() throws Exception {
+    Configuration conf = new HdfsConfiguration();
+    // don't waste time retrying if close fails
+    conf.setInt(DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY, 0);
+    MiniDFSCluster cluster = null;
+
+    final int nodes = 8;
+    final int fileCount = 5;
+    final short fileRepl = 3;
+
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(nodes).build();
+      cluster.waitActive();
+
+      final FSNamesystem namesystem = cluster.getNamesystem();
+      final DatanodeManager dnm = namesystem.getBlockManager().getDatanodeManager();
+      List<DataNode> datanodes = cluster.getDataNodes();
+      final DistributedFileSystem fs = cluster.getFileSystem();
+
+      // trigger heartbeats in case not already sent
+      triggerHeartbeats(datanodes);
+      
+      // check that all nodes are live and in service
+      int expectedTotalLoad = nodes;  // xceiver server adds 1 to load
+      int expectedInServiceNodes = nodes;
+      int expectedInServiceLoad = nodes;
+      assertEquals(nodes, namesystem.getNumLiveDataNodes());
+      assertEquals(expectedInServiceNodes, namesystem.getNumDatanodesInService());
+      assertEquals(expectedTotalLoad, namesystem.getTotalLoad());
+      assertEquals((double)expectedInServiceLoad/expectedInServiceLoad,
+          namesystem.getInServiceXceiverAverage(), EPSILON);
+      
+      // shutdown half the nodes and force a heartbeat check to ensure
+      // counts are accurate
+      for (int i=0; i < nodes/2; i++) {
+        DataNode dn = datanodes.get(i);
+        DatanodeDescriptor dnd = dnm.getDatanode(dn.getDatanodeId());
+        dn.shutdown();
+        dnd.setLastUpdate(0L);
+        BlockManagerTestUtil.checkHeartbeat(namesystem.getBlockManager());
+        expectedInServiceNodes--;
+        assertEquals(expectedInServiceNodes, namesystem.getNumLiveDataNodes());
+        assertEquals(expectedInServiceNodes, namesystem.getNumDatanodesInService());
+      }
+
+      // restart the nodes to verify that counts are correct after
+      // node re-registration 
+      cluster.restartDataNodes();
+      cluster.waitActive();
+      datanodes = cluster.getDataNodes();
+      expectedInServiceNodes = nodes;
+      assertEquals(nodes, datanodes.size());
+      assertEquals(nodes, namesystem.getNumLiveDataNodes());
+      assertEquals(expectedInServiceNodes, namesystem.getNumDatanodesInService());
+      assertEquals(expectedTotalLoad, namesystem.getTotalLoad());
+      assertEquals((double)expectedInServiceLoad/expectedInServiceLoad,
+          namesystem.getInServiceXceiverAverage(), EPSILON);
+      
+      // create streams and hsync to force datastreamers to start
+      DFSOutputStream[] streams = new DFSOutputStream[fileCount];
+      for (int i=0; i < fileCount; i++) {
+        streams[i] = (DFSOutputStream)fs.create(new Path("/f"+i), fileRepl)
+            .getWrappedStream();
+        streams[i].write("1".getBytes());
+        streams[i].hsync();
+        // the load for writers is 2 because both the write xceiver & packet
+        // responder threads are counted in the load
+        expectedTotalLoad += 2*fileRepl;
+        expectedInServiceLoad += 2*fileRepl;
+      }
+      // force nodes to send load update
+      triggerHeartbeats(datanodes);
+      assertEquals(nodes, namesystem.getNumLiveDataNodes());
+      assertEquals(expectedInServiceNodes,
+          namesystem.getNumDatanodesInService());
+      assertEquals(expectedTotalLoad, namesystem.getTotalLoad());
+      assertEquals((double)expectedInServiceLoad/expectedInServiceNodes,
+          namesystem.getInServiceXceiverAverage(), EPSILON);
+
+      // decomm a few nodes, substract their load from the expected load,
+      // trigger heartbeat to force load update
+      for (int i=0; i < fileRepl; i++) {
+        expectedInServiceNodes--;
+        DatanodeDescriptor dnd =
+            dnm.getDatanode(datanodes.get(i).getDatanodeId());
+        expectedInServiceLoad -= dnd.getXceiverCount();
+        dnm.startDecommission(dnd);
+        DataNodeTestUtils.triggerHeartbeat(datanodes.get(i));
+        Thread.sleep(100);
+        assertEquals(nodes, namesystem.getNumLiveDataNodes());
+        assertEquals(expectedInServiceNodes,
+            namesystem.getNumDatanodesInService());
+        assertEquals(expectedTotalLoad, namesystem.getTotalLoad());
+        assertEquals((double)expectedInServiceLoad/expectedInServiceNodes,
+            namesystem.getInServiceXceiverAverage(), EPSILON);
+      }
+      
+      // check expected load while closing each stream.  recalc expected
+      // load based on whether the nodes in the pipeline are decomm
+      for (int i=0; i < fileCount; i++) {
+        int decomm = 0;
+        for (DatanodeInfo dni : streams[i].getPipeline()) {
+          DatanodeDescriptor dnd = dnm.getDatanode(dni);
+          expectedTotalLoad -= 2;
+          if (dnd.isDecommissionInProgress() || dnd.isDecommissioned()) {
+            decomm++;
+          } else {
+            expectedInServiceLoad -= 2;
+          }
+        }
+        try {
+          streams[i].close();
+        } catch (IOException ioe) {
+          // nodes will go decommissioned even if there's a UC block whose
+          // other locations are decommissioned too.  we'll ignore that
+          // bug for now
+          if (decomm < fileRepl) {
+            throw ioe;
+          }
+        }
+        triggerHeartbeats(datanodes);
+        // verify node count and loads 
+        assertEquals(nodes, namesystem.getNumLiveDataNodes());
+        assertEquals(expectedInServiceNodes,
+            namesystem.getNumDatanodesInService());
+        assertEquals(expectedTotalLoad, namesystem.getTotalLoad());
+        assertEquals((double)expectedInServiceLoad/expectedInServiceNodes,
+            namesystem.getInServiceXceiverAverage(), EPSILON);
+      }
+
+      // shutdown each node, verify node counts based on decomm state
+      for (int i=0; i < nodes; i++) {
+        DataNode dn = datanodes.get(i);
+        dn.shutdown();
+        // force it to appear dead so live count decreases
+        DatanodeDescriptor dnDesc = dnm.getDatanode(dn.getDatanodeId());
+        dnDesc.setLastUpdate(0L);
+        BlockManagerTestUtil.checkHeartbeat(namesystem.getBlockManager());
+        assertEquals(nodes-1-i, namesystem.getNumLiveDataNodes());
+        // first few nodes are already out of service
+        if (i >= fileRepl) {
+          expectedInServiceNodes--;
+        }
+        assertEquals(expectedInServiceNodes, namesystem.getNumDatanodesInService());
+        
+        // live nodes always report load of 1.  no nodes is load 0
+        double expectedXceiverAvg = (i == nodes-1) ? 0.0 : 1.0;
+        assertEquals((double)expectedXceiverAvg,
+            namesystem.getInServiceXceiverAverage(), EPSILON);
+      }
+      
+      // final sanity check
+      assertEquals(0, namesystem.getNumLiveDataNodes());
+      assertEquals(0, namesystem.getNumDatanodesInService());
+      assertEquals(0.0, namesystem.getTotalLoad(), EPSILON);
+      assertEquals(0.0, namesystem.getInServiceXceiverAverage(), EPSILON);
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+  
+  private void triggerHeartbeats(List<DataNode> datanodes)
+      throws IOException, InterruptedException {
+    for (DataNode dn : datanodes) {
+      DataNodeTestUtils.triggerHeartbeat(dn);
+    }
+    Thread.sleep(100);
+  }
 }