You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by su...@apache.org on 2011/01/19 00:24:11 UTC

svn commit: r1060619 - in /hadoop/hdfs/trunk: ./ src/docs/src/documentation/content/xdocs/ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/ src/java/org/apache/hadoop/hdfs/server/common/ src/java/org/apache/hadoop/hdfs/server/...

Author: suresh
Date: Tue Jan 18 23:24:10 2011
New Revision: 1060619

URL: http://svn.apache.org/viewvc?rev=1060619&view=rev
Log:
HDFS-1547. Improve decommission mechanism. Contributed by Suresh Srinivas.

Modified:
    hadoop/hdfs/trunk/CHANGES.txt
    hadoop/hdfs/trunk/src/docs/src/documentation/content/xdocs/hdfs_user_guide.xml
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSUtil.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/cli/testHDFSConf.xml
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDecommission.java

Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=1060619&r1=1060618&r2=1060619&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Tue Jan 18 23:24:10 2011
@@ -36,6 +36,8 @@ Trunk (unreleased changes)
     HDFS-1539. A config option for the datanode to fsycn a block file
     when block is completely written. (dhruba)
 
+    HDFS-1547. Improve decommission mechanism. (suresh)
+
   OPTIMIZATIONS
 
     HDFS-1458. Improve checkpoint performance by avoiding unnecessary image

Modified: hadoop/hdfs/trunk/src/docs/src/documentation/content/xdocs/hdfs_user_guide.xml
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/docs/src/documentation/content/xdocs/hdfs_user_guide.xml?rev=1060619&r1=1060618&r2=1060619&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/docs/src/documentation/content/xdocs/hdfs_user_guide.xml (original)
+++ hadoop/hdfs/trunk/src/docs/src/documentation/content/xdocs/hdfs_user_guide.xml Tue Jan 18 23:24:10 2011
@@ -209,14 +209,16 @@
    	</li>
     <li>
       <code>-refreshNodes</code>
-      : Updates the set of hosts allowed to connect to namenode.
-      Re-reads the config file to update values defined by dfs.hosts and 
-      dfs.host.exclude and reads the entires (hostnames) in those files. 
-      Each entry not defined in dfs.hosts but in dfs.hosts.exclude 
-      is decommissioned. Each entry defined in dfs.hosts and also in 
-      dfs.host.exclude is stopped from decommissioning if it has aleady 
-      been marked for decommission. Entires not present in both the lists 
-      are decommissioned. 
+      : Updates the namenode with the set of datanodes allowed to 
+      connect to the namenode. Namenodes re-read datanode hostnames 
+      in the file defined by dfs.hosts, dfs.hosts.exclude. Hosts defined 
+      in dfs.hosts are the datanodes that are part of the cluster. 
+      If there are entries in dfs.hosts, only the hosts in it are 
+      allowed to register with the namenode. Entries in dfs.hosts.exclude 
+      are datanodes that need to be decommissioned. Datanodes complete
+      decommissioning when all the replicas from them are replicated
+      to other datanodes. Decommissioned nodes are not automatically
+      shutdown and are not chosen for writing for new replicas.
     </li>
     <li>
       <code>-printTopology</code>

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSUtil.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSUtil.java?rev=1060619&r1=1060618&r2=1060619&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSUtil.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSUtil.java Tue Jan 18 23:24:10 2011
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hdfs;
 
 import java.io.UnsupportedEncodingException;
+import java.util.Comparator;
 import java.util.StringTokenizer;
 
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -32,6 +33,20 @@ import org.apache.hadoop.net.NodeBase;
 @InterfaceAudience.Private
 public class DFSUtil {
   /**
+   * Compartor for sorting DataNodeInfo[] based on decommissioned states.
+   * Decommissioned nodes are moved to the end of the array on sorting with
+   * this compartor.
+   */
+  public static final Comparator<DatanodeInfo> DECOM_COMPARATOR = 
+    new Comparator<DatanodeInfo>() {
+      @Override
+      public int compare(DatanodeInfo a, DatanodeInfo b) {
+        return a.isDecommissioned() == b.isDecommissioned() ? 0 : 
+          a.isDecommissioned() ? 1 : -1;
+      }
+    };
+  
+  /**
    * Whether the pathname is valid.  Currently prohibits relative paths, 
    * and names which contain a ":" or "/" 
    */

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java?rev=1060619&r1=1060618&r2=1060619&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java Tue Jan 18 23:24:10 2011
@@ -59,7 +59,22 @@ public class DatanodeInfo extends Datano
   protected String hostName = null;
   
   // administrative states of a datanode
-  public enum AdminStates {NORMAL, DECOMMISSION_INPROGRESS, DECOMMISSIONED; }
+  public enum AdminStates {
+    NORMAL("In Service"), 
+    DECOMMISSION_INPROGRESS("Decommission In Progress"), 
+    DECOMMISSIONED("Decommissioned");
+
+    final String value;
+
+    AdminStates(final String v) {
+      this.value = v;
+    }
+
+    public String toString() {
+      return value;
+    }
+  }
+
   @Nullable
   protected AdminStates adminState;
 
@@ -274,7 +289,7 @@ public class DatanodeInfo extends Datano
   /**
    * Retrieves the admin state of this node.
    */
-  AdminStates getAdminState() {
+  public AdminStates getAdminState() {
     if (adminState == null) {
       return AdminStates.NORMAL;
     }

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java?rev=1060619&r1=1060618&r2=1060619&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java Tue Jan 18 23:24:10 2011
@@ -43,6 +43,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.BlockReader;
+import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@@ -262,6 +263,8 @@ public class JspHelper {
         FIELD_NONDFS_USED       = 7,
         FIELD_REMAINING         = 8,
         FIELD_PERCENT_REMAINING = 9,
+        FIELD_ADMIN_STATE       = 10,
+        FIELD_DECOMMISSIONED    = 11,
         SORT_ORDER_ASC          = 1,
         SORT_ORDER_DSC          = 2;
 
@@ -285,6 +288,10 @@ public class JspHelper {
           sortField = FIELD_PERCENT_REMAINING;
         } else if (field.equals("blocks")) {
           sortField = FIELD_BLOCKS;
+        } else if (field.equals("adminstate")) {
+          sortField = FIELD_ADMIN_STATE;
+        } else if (field.equals("decommissioned")) {
+          sortField = FIELD_DECOMMISSIONED;
         } else {
           sortField = FIELD_NAME;
         }
@@ -332,6 +339,13 @@ public class JspHelper {
         case FIELD_BLOCKS:
           ret = d1.numBlocks() - d2.numBlocks();
           break;
+        case FIELD_ADMIN_STATE:
+          ret = d1.getAdminState().toString().compareTo(
+              d2.getAdminState().toString());
+          break;
+        case FIELD_DECOMMISSIONED:
+          ret = DFSUtil.DECOM_COMPARATOR.compare(d1, d2);
+          break;
         case FIELD_NAME: 
           ret = d1.getHostName().compareTo(d2.getHostName());
           break;

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java?rev=1060619&r1=1060618&r2=1060619&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java Tue Jan 18 23:24:10 2011
@@ -124,6 +124,12 @@ public class DatanodeDescriptor extends 
   private long lastBlocksScheduledRollTime = 0;
   private static final int BLOCKS_SCHEDULED_ROLL_INTERVAL = 600*1000; //10min
   private int volumeFailures = 0;
+  
+  /** 
+   * When set to true, the node is not in include list and is not allowed
+   * to communicate with the namenode
+   */
+  private boolean disallowed = false;
 
   /** Default constructor */
   public DatanodeDescriptor() {}
@@ -681,4 +687,16 @@ public class DatanodeDescriptor extends 
     super.updateRegInfo(nodeReg);
     volumeFailures = 0;
   }
+  
+  /**
+   * Set the flag to indicate if this datanode is disallowed from communicating
+   * with the namenode.
+   */
+  void setDisallowed(boolean flag) {
+    disallowed = flag;
+  }
+  
+  boolean isDisallowed() {
+    return disallowed;
+  }
 }

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1060619&r1=1060618&r2=1060619&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Tue Jan 18 23:24:10 2011
@@ -779,6 +779,9 @@ public class FSNamesystem implements FSC
           clientMachine);
       for (LocatedBlock b : blocks.getLocatedBlocks()) {
         clusterMap.pseudoSortByDistance(client, b.getLocations());
+        
+        // Move decommissioned datanodes to the bottom
+        Arrays.sort(b.getLocations(), DFSUtil.DECOM_COMPARATOR);
       }
     }
     return blocks;
@@ -2559,6 +2562,7 @@ public class FSNamesystem implements FSC
       clusterMap.remove(nodeS);
       nodeS.updateRegInfo(nodeReg);
       nodeS.setHostName(hostName);
+      nodeS.setDisallowed(false); // Node is in the include list
       
       // resolve network location
       resolveNetworkLocation(nodeS);
@@ -2573,6 +2577,7 @@ public class FSNamesystem implements FSC
           nodeS.isAlive = true;
         }
       }
+      checkDecommissioning(nodeS, dnAddress);
       return;
     } 
 
@@ -2593,7 +2598,8 @@ public class FSNamesystem implements FSC
     resolveNetworkLocation(nodeDescr);
     unprotectedAddDatanode(nodeDescr);
     clusterMap.add(nodeDescr);
-      
+    checkDecommissioning(nodeDescr, dnAddress);
+    
     // also treat the registration message as a heartbeat
     synchronized(heartbeats) {
       heartbeats.add(nodeDescr);
@@ -2699,13 +2705,13 @@ public class FSNamesystem implements FSC
         } catch(UnregisteredNodeException e) {
           return new DatanodeCommand[]{DatanodeCommand.REGISTER};
         }
-          
+        
         // Check if this datanode should actually be shutdown instead. 
-        if (nodeinfo != null && shouldNodeShutdown(nodeinfo)) {
+        if (nodeinfo != null && nodeinfo.isDisallowed()) {
           setDatanodeDead(nodeinfo);
           throw new DisallowedDatanodeException(nodeinfo);
         }
-
+         
         if (nodeinfo == null || !nodeinfo.isAlive) {
           return new DatanodeCommand[]{DatanodeCommand.REGISTER};
         }
@@ -2754,18 +2760,28 @@ public class FSNamesystem implements FSC
   private void updateStats(DatanodeDescriptor node, boolean isAdded) {
     //
     // The statistics are protected by the heartbeat lock
+    // For decommissioning/decommissioned nodes, only used capacity
+    // is counted.
     //
     assert(Thread.holdsLock(heartbeats));
     if (isAdded) {
-      capacityTotal += node.getCapacity();
       capacityUsed += node.getDfsUsed();
-      capacityRemaining += node.getRemaining();
       totalLoad += node.getXceiverCount();
+      if (!(node.isDecommissionInProgress() || node.isDecommissioned())) {
+        capacityTotal += node.getCapacity();
+        capacityRemaining += node.getRemaining();
+      } else {
+        capacityTotal += node.getDfsUsed();
+      }
     } else {
-      capacityTotal -= node.getCapacity();
       capacityUsed -= node.getDfsUsed();
-      capacityRemaining -= node.getRemaining();
       totalLoad -= node.getXceiverCount();
+      if (!(node.isDecommissionInProgress() || node.isDecommissioned())) {
+        capacityTotal -= node.getCapacity();
+        capacityRemaining -= node.getRemaining();
+      } else {
+        capacityTotal -= node.getDfsUsed();
+      }
     }
   }
 
@@ -3072,12 +3088,6 @@ public class FSNamesystem implements FSC
                             + nodeID.getName());
     }
 
-    // Check if this datanode should actually be shutdown instead.
-    if (shouldNodeShutdown(node)) {
-      setDatanodeDead(node);
-      throw new DisallowedDatanodeException(node);
-    }
-    
     blockManager.processReport(node, newReport);
     NameNode.getNameNodeMetrics().blockReport.inc((int) (now() - startTime));
     } finally {
@@ -3209,12 +3219,6 @@ public class FSNamesystem implements FSC
                                     +block+" is received from " + nodeID.getName());
     }
 
-    // Check if this datanode should actually be shutdown instead.
-    if (shouldNodeShutdown(node)) {
-      setDatanodeDead(node);
-      throw new DisallowedDatanodeException(node);
-    }
-
     blockManager.addBlock(node, block, delHint);
     } finally {
       writeUnlock();
@@ -3471,12 +3475,16 @@ public class FSNamesystem implements FSC
     throws IOException {
 
     if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
-      LOG.info("Start Decommissioning node " + node.getName());
-      node.startDecommission();
+      LOG.info("Start Decommissioning node " + node.getName() + " with " + 
+          node.numBlocks() +  " blocks.");
+      synchronized (heartbeats) {
+        updateStats(node, false);
+        node.startDecommission();
+        updateStats(node, true);
+      }
       node.decommissioningStatus.setStartTime(now());
-      //
-      // all the blocks that reside on this node have to be 
-      // replicated.
+      
+      // all the blocks that reside on this node have to be replicated.
       checkDecommissionStateInternal(node);
     }
   }
@@ -3486,8 +3494,14 @@ public class FSNamesystem implements FSC
    */
   public void stopDecommission (DatanodeDescriptor node) 
     throws IOException {
-    LOG.info("Stop Decommissioning node " + node.getName());
-    node.stopDecommission();
+    if (node.isDecommissionInProgress() || node.isDecommissioned()) {
+      LOG.info("Stop Decommissioning node " + node.getName());
+      synchronized (heartbeats) {
+        updateStats(node, false);
+        node.stopDecommission();
+        updateStats(node, true);
+      }
+    }
   }
 
   /** 
@@ -3574,10 +3588,7 @@ public class FSNamesystem implements FSC
         LOG.info("Decommission complete for node " + node.getName());
       }
     }
-    if (node.isDecommissioned()) {
-      return true;
-    }
-    return false;
+    return node.isDecommissioned();
   }
 
   /** 
@@ -3627,18 +3638,12 @@ public class FSNamesystem implements FSC
         DatanodeDescriptor node = it.next();
         // Check if not include.
         if (!inHostsList(node, null)) {
-          node.setDecommissioned();  // case 2.
+          node.setDisallowed(true);  // case 2.
         } else {
           if (inExcludedHostsList(node, null)) {
-            if (!node.isDecommissionInProgress() && 
-                !node.isDecommissioned()) {
-              startDecommission(node);   // case 3.
-            }
+            startDecommission(node);   // case 3.
           } else {
-            if (node.isDecommissionInProgress() || 
-                node.isDecommissioned()) {
-              stopDecommission(node);   // case 4.
-            } 
+            stopDecommission(node);   // case 4.
           }
         }
       }
@@ -3654,39 +3659,22 @@ public class FSNamesystem implements FSC
 
   /**
    * Checks if the node is not on the hosts list.  If it is not, then
-   * it will be ignored.  If the node is in the hosts list, but is also 
-   * on the exclude list, then it will be decommissioned.
-   * Returns FALSE if node is rejected for registration. 
-   * Returns TRUE if node is registered (including when it is on the 
-   * exclude list and is being decommissioned). 
+   * it will be disallowed from registering. 
    */
-  private boolean verifyNodeRegistration(DatanodeID nodeReg, String ipAddr) 
-    throws IOException {
+  private boolean verifyNodeRegistration(DatanodeID nodeReg, String ipAddr) {
     assert (hasWriteLock());
-    if (!inHostsList(nodeReg, ipAddr)) {
-      return false;    
-    }
-    if (inExcludedHostsList(nodeReg, ipAddr)) {
-      DatanodeDescriptor node = getDatanode(nodeReg);
-      if (node == null) {
-        throw new IOException("verifyNodeRegistration: unknown datanode " +
-                              nodeReg.getName());
-      }
-      if (!checkDecommissionStateInternal(node)) {
-        startDecommission(node);
-      }
-    } 
-    return true;
+    return inHostsList(nodeReg, ipAddr);
   }
     
   /**
-   * Checks if the Admin state bit is DECOMMISSIONED.  If so, then 
-   * we should shut it down. 
-   * 
-   * Returns true if the node should be shutdown.
+   * Decommission the node if it is in exclude list.
    */
-  private boolean shouldNodeShutdown(DatanodeDescriptor node) {
-    return (node.isDecommissioned());
+  private void checkDecommissioning(DatanodeDescriptor nodeReg, String ipAddr) 
+    throws IOException {
+    // If the registered node is in exclude list, then decommission it
+    if (inExcludedHostsList(nodeReg, ipAddr)) {
+      startDecommission(nodeReg);
+    }
   }
     
   /**
@@ -4941,6 +4929,9 @@ public class FSNamesystem implements FSC
     }
   }
   
+  /**
+   * @return list of datanodes where decommissioning is in progress
+   */
   public ArrayList<DatanodeDescriptor> getDecommissioningNodes() {
     readLock();
     try {
@@ -4960,10 +4951,9 @@ public class FSNamesystem implements FSC
     }
   }
 
-  /*
-   * Delegation Token
+  /**
+   * Create delegation token secret manager
    */
-   
   private DelegationTokenSecretManager createDelegationTokenSecretManager(
       Configuration conf) {
     return new DelegationTokenSecretManager(conf.getLong(
@@ -5288,6 +5278,7 @@ public class FSNamesystem implements FSC
       final Map<String, Object> innerinfo = new HashMap<String, Object>();
       innerinfo.put("lastContact", getLastContact(node));
       innerinfo.put("usedSpace", getDfsUsed(node));
+      innerinfo.put("adminState", node.getAdminState().toString());
       info.put(node.getHostName(), innerinfo);
     }
     return JSON.toString(info);
@@ -5305,6 +5296,7 @@ public class FSNamesystem implements FSC
     for (DatanodeDescriptor node : deadNodeList) {
       final Map<String, Object> innerinfo = new HashMap<String, Object>();
       innerinfo.put("lastContact", getLastContact(node));
+      innerinfo.put("decommissioned", node.isDecommissioned());
       info.put(node.getHostName(), innerinfo);
     }
     return JSON.toString(info);

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java?rev=1060619&r1=1060618&r2=1060619&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java Tue Jan 18 23:24:10 2011
@@ -207,7 +207,17 @@ class NamenodeJspHelper {
       ArrayList<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
       ArrayList<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>();
       fsn.DFSNodesStatus(live, dead);
+      
+      int liveDecommissioned = 0;
+      for (DatanodeDescriptor d : live) {
+        liveDecommissioned += d.isDecommissioned() ? 1 : 0;
+      }
 
+      int deadDecommissioned = 0;
+      for (DatanodeDescriptor d : dead) {
+        deadDecommissioned += d.isDecommissioned() ? 1 : 0;
+      }
+      
       ArrayList<DatanodeDescriptor> decommissioning = fsn
           .getDecommissioningNodes();
 
@@ -292,9 +302,13 @@ class NamenodeJspHelper {
           + colTxt() + StringUtils.limitDecimalTo2(dev) + " %"
           + rowTxt() + colTxt()
           + "<a href=\"dfsnodelist.jsp?whatNodes=LIVE\">Live Nodes</a> "
-          + colTxt() + ":" + colTxt() + live.size() + rowTxt() + colTxt()
+          + colTxt() + ":" + colTxt() + live.size()
+          + " (Decommissioned: " + liveDecommissioned + ")"
+          + rowTxt() + colTxt()
           + "<a href=\"dfsnodelist.jsp?whatNodes=DEAD\">Dead Nodes</a> "
-          + colTxt() + ":" + colTxt() + dead.size() + rowTxt() + colTxt()
+          + colTxt() + ":" + colTxt() + dead.size() 
+          + " (Decommissioned: " + deadDecommissioned + ")"
+          + rowTxt() + colTxt()
           + "<a href=\"dfsnodelist.jsp?whatNodes=DECOMMISSIONING\">"
           + "Decommissioning Nodes</a> "
           + colTxt() + ":" + colTxt() + decommissioning.size() 
@@ -446,8 +460,11 @@ class NamenodeJspHelper {
        */
 
       generateNodeDataHeader(out, d, suffix, alive, nnHttpPort);
-      if (!alive)
+      if (!alive) {
+        out.print("<td class=\"decommissioned\"> " + 
+            d.isDecommissioned() + "\n");
         return;
+      }
 
       long c = d.getCapacity();
       long u = d.getDfsUsed();
@@ -457,9 +474,7 @@ class NamenodeJspHelper {
       String percentRemaining = StringUtils.limitDecimalTo2(d
           .getRemainingPercent());
 
-      String adminState = (d.isDecommissioned() ? "Decommissioned" : (d
-          .isDecommissionInProgress() ? "Decommission In Progress"
-          : "In Service"));
+      String adminState = d.getAdminState().toString();
 
       long timestamp = d.getLastUpdate();
       long currentTime = System.currentTimeMillis();
@@ -502,7 +517,6 @@ class NamenodeJspHelper {
         sorterOrder = "ASC";
 
       JspHelper.sortNodeList(live, sorterField, sorterOrder);
-      JspHelper.sortNodeList(dead, "name", "ASC");
 
       // Find out common suffix. Should this be before or after the sort?
       String port_suffix = null;
@@ -535,7 +549,6 @@ class NamenodeJspHelper {
         int nnHttpPort = nn.getHttpAddress().getPort();
         out.print("<div id=\"dfsnodetable\"> ");
         if (whatNodes.equals("LIVE")) {
-
           out.print("<a name=\"LiveNodes\" id=\"title\">" + "Live Datanodes : "
               + live.size() + "</a>"
               + "<br><br>\n<table border=1 cellspacing=0>\n");
@@ -577,9 +590,11 @@ class NamenodeJspHelper {
 
           if (dead.size() > 0) {
             out.print("<table border=1 cellspacing=0> <tr id=\"row1\"> "
-                + "<td> Node \n");
+                + "<th " + nodeHeaderStr("node")
+                + "> Node <th " + nodeHeaderStr("decommissioned")
+                + "> Decommissioned\n");
 
-            JspHelper.sortNodeList(dead, "name", "ASC");
+            JspHelper.sortNodeList(dead, sorterField, sorterOrder);
             for (int i = 0; i < dead.size(); i++) {
               generateNodeData(out, dead.get(i), port_suffix, false, nnHttpPort);
             }

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java?rev=1060619&r1=1060618&r2=1060619&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java Tue Jan 18 23:24:10 2011
@@ -501,16 +501,18 @@ public class DFSAdmin extends FsShell {
     "Set/Unset/Check flag to attempt restore of failed storage replicas if they become available.\n" +
     "\t\tRequires superuser permissions.\n";
     
-    String refreshNodes = "-refreshNodes: \tUpdates the set of hosts allowed " +
-                          "to connect to namenode.\n\n" +
-      "\t\tRe-reads the config file to update values defined by \n" +
-      "\t\tdfs.hosts and dfs.host.exclude and reads the \n" +
-      "\t\tentires (hostnames) in those files.\n\n" +
-      "\t\tEach entry not defined in dfs.hosts but in \n" + 
-      "\t\tdfs.hosts.exclude is decommissioned. Each entry defined \n" +
-      "\t\tin dfs.hosts and also in dfs.host.exclude is stopped from \n" +
-      "\t\tdecommissioning if it has aleady been marked for decommission.\n" + 
-      "\t\tEntires not present in both the lists are decommissioned.\n";
+    String refreshNodes = "-refreshNodes: \tUpdates the namenode with the " +
+      "set of datanodes allowed to connect to the namenode.\n\n" +
+      "\t\tNamenode re-reads datanode hostnames from the file defined by \n" +
+      "\t\tdfs.hosts, dfs.hosts.exclude configuration parameters.\n" +
+      "\t\tHosts defined in dfs.hosts are the datanodes that are part of \n" +
+      "\t\tthe cluster. If there are entries in dfs.hosts, only the hosts \n" +
+      "\t\tin it are allowed to register with the namenode.\n\n" +
+      "\t\tEntries in dfs.hosts.exclude are datanodes that need to be \n" +
+      "\t\tdecommissioned. Datanodes complete decommissioning when \n" + 
+      "\t\tall the replicas from them are replicated to other datanodes.\n" +
+      "\t\tDecommissioned nodes are not automatically shutdown and \n" +
+      "\t\tare not chosen for writing new replicas.\n";
 
     String finalizeUpgrade = "-finalizeUpgrade: Finalize upgrade of HDFS.\n" +
       "\t\tDatanodes delete their previous version working directories,\n" +

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/cli/testHDFSConf.xml
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/cli/testHDFSConf.xml?rev=1060619&r1=1060618&r2=1060619&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/cli/testHDFSConf.xml (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/cli/testHDFSConf.xml Tue Jan 18 23:24:10 2011
@@ -15899,39 +15899,47 @@
       <comparators>
         <comparator>
           <type>RegexpComparator</type>
-          <expected-output>^-refreshNodes:( |\t)*Updates the set of hosts allowed to connect to namenode.( )*</expected-output>
+          <expected-output>^-refreshNodes:( |\t)*Updates the namenode with the set of datanodes allowed to connect to the namenode.( )*</expected-output>
         </comparator>
         <comparator>
           <type>RegexpComparator</type>
-          <expected-output>^( |\t)*Re-reads the config file to update values defined by( )*</expected-output>
+          <expected-output>^( |\t)*Namenode re-reads datanode hostnames from the file defined by( )*</expected-output>
         </comparator>
         <comparator>
           <type>RegexpComparator</type>
-          <expected-output>^( |\t)*dfs.hosts and dfs.host.exclude and reads the( )*</expected-output>
+          <expected-output>^( |\t)*dfs.hosts, dfs.hosts.exclude configuration parameters.( )*</expected-output>
         </comparator>
         <comparator>
           <type>RegexpComparator</type>
-          <expected-output>^( |\t)*entires \(hostnames\) in those files.( )*</expected-output>
+          <expected-output>^( |\t)*Hosts defined in dfs.hosts are the datanodes that are part of( )*</expected-output>
         </comparator>
         <comparator>
           <type>RegexpComparator</type>
-          <expected-output>^( |\t)*Each entry not defined in dfs.hosts but in( )*</expected-output>
+          <expected-output>^( |\t)*the cluster. If there are entries in dfs.hosts, only the hosts( )*</expected-output>
         </comparator>
         <comparator>
           <type>RegexpComparator</type>
-          <expected-output>^( |\t)*dfs.hosts.exclude is decommissioned. Each entry defined( )*</expected-output>
+          <expected-output>^( |\t)*in it are allowed to register with the namenode.( )*</expected-output>
         </comparator>
         <comparator>
           <type>RegexpComparator</type>
-          <expected-output>^( |\t)*in dfs.hosts and also in dfs.host.exclude is stopped from( )*</expected-output>
+          <expected-output>^( |\t)*Entries in dfs.hosts.exclude are datanodes that need to be( )*</expected-output>
         </comparator>
         <comparator>
           <type>RegexpComparator</type>
-          <expected-output>^( |\t)*decommissioning if it has aleady been marked for decommission.( )*</expected-output>
+          <expected-output>^( |\t)*decommissioned. Datanodes complete decommissioning when ( )*</expected-output>
         </comparator>
         <comparator>
           <type>RegexpComparator</type>
-          <expected-output>^( |\t)*Entires not present in both the lists are decommissioned.( )*</expected-output>
+          <expected-output>^( |\t)*all the replicas from them are replicated to other datanodes.( )*</expected-output>
+        </comparator>
+        <comparator>
+          <type>RegexpComparator</type>
+          <expected-output>^( |\t)*Decommissioned nodes are not automatically shutdown and( )*</expected-output>
+        </comparator>
+        <comparator>
+          <type>RegexpComparator</type>
+          <expected-output>^( |\t)*are not chosen for writing new replicas.( )*</expected-output>
         </comparator>
       </comparators>
     </test>

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java?rev=1060619&r1=1060618&r2=1060619&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java Tue Jan 18 23:24:10 2011
@@ -20,9 +20,12 @@ package org.apache.hadoop.hdfs;
 import static org.apache.hadoop.hdfs.server.common.Util.fileAsURI;
 
 import java.io.File;
+import java.io.FileWriter;
 import java.io.IOException;
+import java.io.PrintWriter;
 import java.io.RandomAccessFile;
 import java.net.InetSocketAddress;
+import java.net.ServerSocket;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.nio.channels.FileChannel;
@@ -86,6 +89,7 @@ public class MiniDFSCluster {
     private long [] simulatedCapacities = null;
     // wait until namenode has left safe mode?
     private boolean waitSafeMode = true;
+    private boolean setupHostsFile = false;
     
     public Builder(Configuration conf) {
       this.conf = conf;
@@ -172,6 +176,15 @@ public class MiniDFSCluster {
     }
     
     /**
+     * Default: false
+     * When true the hosts file/include file for the cluster is setup
+     */
+    public Builder setupHostsFile(boolean val) {
+      this.setupHostsFile = val;
+      return this;
+    }
+    
+    /**
      * Construct the actual MiniDFSCluster
      */
     public MiniDFSCluster build() throws IOException {
@@ -193,7 +206,8 @@ public class MiniDFSCluster {
                        builder.racks,
                        builder.hosts,
                        builder.simulatedCapacities,
-                       builder.waitSafeMode);
+                       builder.waitSafeMode,
+                       builder.setupHostsFile);
   }
   
   public class DataNodeProperties {
@@ -390,13 +404,14 @@ public class MiniDFSCluster {
                         long[] simulatedCapacities) throws IOException {
     initMiniDFSCluster(nameNodePort, conf, numDataNodes, format,
         manageNameDfsDirs, manageDataDfsDirs, operation, racks, hosts,
-        simulatedCapacities, true);
+        simulatedCapacities, true, false);
   }
 
   private void initMiniDFSCluster(int nameNodePort, Configuration conf,
       int numDataNodes, boolean format, boolean manageNameDfsDirs,
       boolean manageDataDfsDirs, StartupOption operation, String[] racks,
-      String[] hosts, long[] simulatedCapacities, boolean waitSafeMode)
+      String[] hosts, long[] simulatedCapacities, boolean waitSafeMode,
+      boolean setupHostsFile)
     throws IOException {
     this.conf = conf;
     base_dir = new File(getBaseDirectory());
@@ -461,8 +476,8 @@ public class MiniDFSCluster {
     nameNode = NameNode.createNameNode(args, conf);
     
     // Start the DataNodes
-    startDataNodes(conf, numDataNodes, manageDataDfsDirs, 
-                    operation, racks, hosts, simulatedCapacities);
+    startDataNodes(conf, numDataNodes, manageDataDfsDirs, operation, racks,
+        hosts, simulatedCapacities, setupHostsFile);
     waitClusterUp();
 
     //make sure ProxyUsers uses the latest conf
@@ -531,7 +546,8 @@ public class MiniDFSCluster {
   public synchronized void startDataNodes(Configuration conf, int numDataNodes, 
                              boolean manageDfsDirs, StartupOption operation, 
                              String[] racks, String[] hosts,
-                             long[] simulatedCapacities) throws IOException {
+                             long[] simulatedCapacities,
+                             boolean setupHostsFile) throws IOException {
 
     int curDatanodesNum = dataNodes.size();
     // for mincluster's the default initialDelay for BRs is 0
@@ -577,12 +593,6 @@ public class MiniDFSCluster {
           + "] is less than the number of datanodes [" + numDataNodes + "].");
     }
 
-    // Set up the right ports for the datanodes
-    conf.set("dfs.datanode.address", "127.0.0.1:0");
-    conf.set("dfs.datanode.http.address", "127.0.0.1:0");
-    conf.set("dfs.datanode.ipc.address", "127.0.0.1:0");
-    
-
     String [] dnArgs = (operation == null ||
                         operation != StartupOption.ROLLBACK) ?
         null : new String[] {operation.getName()};
@@ -590,6 +600,8 @@ public class MiniDFSCluster {
     
     for (int i = curDatanodesNum; i < curDatanodesNum+numDataNodes; i++) {
       Configuration dnConf = new HdfsConfiguration(conf);
+      // Set up datanode address
+      setupDatanodeAddress(dnConf, setupHostsFile);
       if (manageDfsDirs) {
         File dir1 = new File(data_dir, "data"+(2*i+1));
         File dir2 = new File(data_dir, "data"+(2*i+2));
@@ -671,7 +683,8 @@ public class MiniDFSCluster {
       boolean manageDfsDirs, StartupOption operation, 
       String[] racks
       ) throws IOException {
-    startDataNodes( conf,  numDataNodes, manageDfsDirs,  operation, racks, null, null);
+    startDataNodes(conf, numDataNodes, manageDfsDirs, operation, racks, null,
+        null, false);
   }
   
   /**
@@ -701,7 +714,7 @@ public class MiniDFSCluster {
                              String[] racks,
                              long[] simulatedCapacities) throws IOException {
     startDataNodes(conf, numDataNodes, manageDfsDirs, operation, racks, null,
-                   simulatedCapacities);
+                   simulatedCapacities, false);
     
   }
   /**
@@ -1197,4 +1210,48 @@ public class MiniDFSCluster {
   public static String getBaseDirectory() {
     return System.getProperty("test.build.data", "build/test/data") + "/dfs/";
   }
+  
+  private int getFreeSocketPort() {
+    int port = 0;
+    try {
+      ServerSocket s = new ServerSocket(0);
+      port = s.getLocalPort();
+      s.close();
+      return port;
+    } catch (IOException e) {
+      // Could not get a free port. Return default port 0.
+    }
+    return port;
+  }
+  
+  private void setupDatanodeAddress(Configuration conf, boolean setupHostsFile) throws IOException {
+    if (setupHostsFile) {
+      String hostsFile = conf.get("dfs.hosts", "").trim();
+      if (hostsFile.length() == 0) {
+        throw new IOException("Parameter dfs.hosts is not setup in conf");
+      }
+      // Setup datanode in the include file, if it is defined in the conf
+      String address = "127.0.0.1:" + getFreeSocketPort();
+      conf.set("dfs.datanode.address", address);
+      addToFile(hostsFile, address);
+      LOG.info("Adding datanode " + address + " to hosts file " + hostsFile);
+    } else {
+      conf.set("dfs.datanode.address", "127.0.0.1:0");
+      conf.set("dfs.datanode.http.address", "127.0.0.1:0");
+      conf.set("dfs.datanode.ipc.address", "127.0.0.1:0");
+    }
+  }
+  
+  private void addToFile(String p, String address) throws IOException {
+    File f = new File(p);
+    if (!f.exists()) {
+      f.createNewFile();
+    }
+    PrintWriter writer = new PrintWriter(new FileWriter(f, true));
+    try {
+      writer.println(address);
+    } finally {
+      writer.close();
+    }
+  }
 }

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDecommission.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDecommission.java?rev=1060619&r1=1060618&r2=1060619&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDecommission.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDecommission.java Tue Jan 18 23:24:10 2011
@@ -24,8 +24,8 @@ import java.util.Collection;
 import java.util.Iterator;
 import java.util.Random;
 
-import junit.framework.TestCase;
-
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -33,36 +33,68 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
 import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import static org.junit.Assert.*;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
 
 /**
  * This class tests the decommissioning of nodes.
  */
-public class TestDecommission extends TestCase {
+public class TestDecommission {
+  public static final Log LOG = LogFactory.getLog(TestDecommission.class);
   static final long seed = 0xDEADBEEFL;
   static final int blockSize = 8192;
   static final int fileSize = 16384;
-  static final int numDatanodes = 6;
-
+  static final int HEARTBEAT_INTERVAL = 1; // heartbeat interval in seconds
 
   Random myrand = new Random();
   Path hostsFile;
   Path excludeFile;
-
-  ArrayList<String> decommissionedNodes = new ArrayList<String>(numDatanodes);
-
-  private enum NodeState {NORMAL, DECOMMISSION_INPROGRESS, DECOMMISSIONED; }
-
-  private void writeConfigFile(FileSystem fs, Path name, ArrayList<String> nodes) 
+  FileSystem localFileSys;
+  Configuration conf;
+  MiniDFSCluster cluster = null;
+
+  @Before
+  public void setup() throws IOException {
+    conf = new HdfsConfiguration();
+    // Set up the hosts/exclude files.
+    localFileSys = FileSystem.getLocal(conf);
+    Path workingDir = localFileSys.getWorkingDirectory();
+    Path dir = new Path(workingDir, "build/test/data/work-dir/decommission");
+    hostsFile = new Path(dir, "hosts");
+    excludeFile = new Path(dir, "exclude");
+    
+    // Setup conf
+    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, false);
+    conf.set("dfs.hosts.exclude", excludeFile.toUri().getPath());
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 2000);
+    conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL);
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, 4);
+    writeConfigFile(excludeFile, null);
+  }
+  
+  @After
+  public void teardown() throws IOException {
+    cleanupFile(localFileSys, excludeFile.getParent());
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+  
+  private void writeConfigFile(Path name, ArrayList<String> nodes) 
     throws IOException {
-
     // delete if it already exists
-    if (fs.exists(name)) {
-      fs.delete(name, true);
+    if (localFileSys.exists(name)) {
+      localFileSys.delete(name, true);
     }
 
-    FSDataOutputStream stm = fs.create(name);
+    FSDataOutputStream stm = localFileSys.create(name);
     
     if (nodes != null) {
       for (Iterator<String> it = nodes.iterator(); it.hasNext();) {
@@ -87,23 +119,17 @@ public class TestDecommission extends Te
     stm.close();
   }
   
-  
-  private void checkFile(FileSystem fileSys, Path name, int repl)
-    throws IOException {
-    DFSTestUtil.waitReplication(fileSys, name, (short) repl);
-  }
-
   private void printFileLocations(FileSystem fileSys, Path name)
   throws IOException {
     BlockLocation[] locations = fileSys.getFileBlockLocations(
         fileSys.getFileStatus(name), 0, fileSize);
     for (int idx = 0; idx < locations.length; idx++) {
       String[] loc = locations[idx].getHosts();
-      System.out.print("Block[" + idx + "] : ");
+      StringBuilder buf = new StringBuilder("Block[" + idx + "] : ");
       for (int j = 0; j < loc.length; j++) {
-        System.out.print(loc[j] + " ");
+        buf.append(loc[j] + " ");
       }
-      System.out.println("");
+      LOG.info(buf.toString());
     }
   }
 
@@ -112,7 +138,7 @@ public class TestDecommission extends Te
    * replication factor is 1 more than the specified one.
    */
   private void checkFile(FileSystem fileSys, Path name, int repl,
-                         String downnode) throws IOException {
+                         String downnode, int numDatanodes) throws IOException {
     //
     // sleep an additional 10 seconds for the blockreports from the datanodes
     // to arrive. 
@@ -126,16 +152,24 @@ public class TestDecommission extends Te
 
     for (LocatedBlock blk : dinfo) { // for each block
       int hasdown = 0;
+      int firstDecomNodeIndex = -1;
       DatanodeInfo[] nodes = blk.getLocations();
       for (int j = 0; j < nodes.length; j++) {     // for each replica
         if (nodes[j].getName().equals(downnode)) {
           hasdown++;
-          System.out.println("Block " + blk.getBlock() + " replica " +
-                             nodes[j].getName() + " is decommissioned.");
+          LOG.info("Block " + blk.getBlock() + " replica " + nodes[j].getName()
+              + " is decommissioned.");
+        }
+        if (nodes[j].isDecommissioned()) {
+          if (firstDecomNodeIndex == -1) {
+            firstDecomNodeIndex = j;
+          }
+          continue;
         }
+        assertEquals("Decom node is not at the end", firstDecomNodeIndex, -1);
       }
-      System.out.println("Block " + blk.getBlock() + " has " + hasdown +
-                         " decommissioned replica.");
+      LOG.info("Block " + blk.getBlock() + " has " + hasdown
+          + " decommissioned replica.");
       assertEquals("Number of replicas for block" + blk.getBlock(),
                    Math.min(numDatanodes, repl+hasdown), nodes.length);  
     }
@@ -148,21 +182,21 @@ public class TestDecommission extends Te
   }
 
   private void printDatanodeReport(DatanodeInfo[] info) {
-    System.out.println("-------------------------------------------------");
+    LOG.info("-------------------------------------------------");
     for (int i = 0; i < info.length; i++) {
-      System.out.println(info[i].getDatanodeReport());
-      System.out.println();
+      LOG.info(info[i].getDatanodeReport());
+      LOG.info("");
     }
   }
 
   /*
    * decommission one random node.
    */
-  private String decommissionNode(FSNamesystem namesystem,
-                                  Configuration conf,
-                                  DFSClient client, 
-                                  FileSystem localFileSys)
+  private DatanodeInfo decommissionNode(FSNamesystem namesystem,
+                                  ArrayList<String>decommissionedNodes,
+                                  AdminStates waitForState)
     throws IOException {
+    DFSClient client = getDfsClient(cluster, conf);
     DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.LIVE);
 
     //
@@ -177,122 +211,188 @@ public class TestDecommission extends Te
       }
     }
     String nodename = info[index].getName();
-    System.out.println("Decommissioning node: " + nodename);
+    LOG.info("Decommissioning node: " + nodename);
 
     // write nodename into the exclude file.
     ArrayList<String> nodes = new ArrayList<String>(decommissionedNodes);
     nodes.add(nodename);
-    writeConfigFile(localFileSys, excludeFile, nodes);
+    writeConfigFile(excludeFile, nodes);
     namesystem.refreshNodes(conf);
-    return nodename;
-  }
-
-  /*
-   * Check if node is in the requested state.
-   */
-  private boolean checkNodeState(FileSystem filesys, 
-                                 String node, 
-                                 NodeState state) throws IOException {
-    DistributedFileSystem dfs = (DistributedFileSystem) filesys;
-    boolean done = false;
-    boolean foundNode = false;
-    DatanodeInfo[] datanodes = dfs.getDataNodeStats();
-    for (int i = 0; i < datanodes.length; i++) {
-      DatanodeInfo dn = datanodes[i];
-      if (dn.getName().equals(node)) {
-        if (state == NodeState.DECOMMISSIONED) {
-          done = dn.isDecommissioned();
-        } else if (state == NodeState.DECOMMISSION_INPROGRESS) {
-          done = dn.isDecommissionInProgress();
-        } else {
-          done = (!dn.isDecommissionInProgress() && !dn.isDecommissioned());
-        }
-        System.out.println(dn.getDatanodeReport());
-        foundNode = true;
-      }
-    }
-    if (!foundNode) {
-      throw new IOException("Could not find node: " + node);
-    }
-    return done;
+    DatanodeInfo ret = namesystem.getDatanode(info[index]);
+    waitNodeState(ret, waitForState);
+    return ret;
   }
 
   /* 
    * Wait till node is fully decommissioned.
    */
-  private void waitNodeState(FileSystem filesys,
-                             String node,
-                             NodeState state) throws IOException {
-    boolean done = checkNodeState(filesys, node, state);
+  private void waitNodeState(DatanodeInfo node,
+                             AdminStates state) throws IOException {
+    boolean done = state == node.getAdminState();
     while (!done) {
-      System.out.println("Waiting for node " + node +
-                         " to change state to " + state);
+      LOG.info("Waiting for node " + node + " to change state to "
+          + state + " current state: " + node.getAdminState());
       try {
-        Thread.sleep(1000);
+        Thread.sleep(HEARTBEAT_INTERVAL * 1000);
       } catch (InterruptedException e) {
         // nothing
       }
-      done = checkNodeState(filesys, node, state);
+      done = state == node.getAdminState();
     }
   }
   
-  /**
-   * Tests Decommission in DFS.
-   */
-  public void testDecommission() throws IOException {
-    Configuration conf = new HdfsConfiguration();
-    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, false);
-
-    // Set up the hosts/exclude files.
-    FileSystem localFileSys = FileSystem.getLocal(conf);
-    Path workingDir = localFileSys.getWorkingDirectory();
-    Path dir = new Path(workingDir, "build/test/data/work-dir/decommission");
-    assertTrue(localFileSys.mkdirs(dir));
-    hostsFile = new Path(dir, "hosts");
-    excludeFile = new Path(dir, "exclude");
-    conf.set("dfs.hosts.exclude", excludeFile.toUri().getPath());
-    conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 2000);
-    conf.setInt("dfs.heartbeat.interval", 1);
-    conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, 4);
-    writeConfigFile(localFileSys, excludeFile, null);
-
-    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
-                                               .numDataNodes(numDatanodes).build();
-    cluster.waitActive();
+  /* Get DFSClient to the namenode */
+  private static DFSClient getDfsClient(MiniDFSCluster cluster,
+      Configuration conf) throws IOException {
     InetSocketAddress addr = new InetSocketAddress("localhost", 
                                                    cluster.getNameNodePort());
-    DFSClient client = new DFSClient(addr, conf);
+    return new DFSClient(addr, conf);
+  }
+  
+  /* Validate cluster has expected number of datanodes */
+  private static void validateCluster(DFSClient client, int numDNs)
+      throws IOException {
     DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.LIVE);
-    assertEquals("Number of Datanodes ", numDatanodes, info.length);
-    FileSystem fileSys = cluster.getFileSystem();
+    assertEquals("Number of Datanodes ", numDNs, info.length);
+  }
+  
+  /** Start a MiniDFSCluster 
+   * @throws IOException */
+  private void startCluster(int numDatanodes, Configuration conf)
+      throws IOException {
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDatanodes)
+        .build();
+    cluster.waitActive();
+    DFSClient client = getDfsClient(cluster, conf);
+    validateCluster(client, numDatanodes);
+  }
+  
+  private void verifyStats(NameNode namenode, FSNamesystem fsn,
+      DatanodeInfo node, boolean decommissioning) throws InterruptedException {
+    // Do the stats check over 10 iterations
+    for (int i = 0; i < 10; i++) {
+      long[] newStats = namenode.getStats();
+
+      // For decommissioning nodes, ensure capacity of the DN is no longer
+      // counted. Only used space of the DN is counted in cluster capacity
+      assertEquals(newStats[0], decommissioning ? node.getDfsUsed() : 
+        node.getCapacity());
+
+      // Ensure cluster used capacity is counted for both normal and
+      // decommissioning nodes
+      assertEquals(newStats[1], node.getDfsUsed());
+
+      // For decommissioning nodes, remaining space from the DN is not counted
+      assertEquals(newStats[2], decommissioning ? 0 : node.getRemaining());
+
+      // Ensure transceiver count is same as that DN
+      assertEquals(fsn.getTotalLoad(), node.getXceiverCount());
+      
+      Thread.sleep(HEARTBEAT_INTERVAL * 1000); // Sleep heart beat interval
+    }
+  }
 
+  /**
+   * Tests Decommission in DFS.
+   */
+  @Test
+  public void testDecommission() throws IOException {
+    LOG.info("Starting test testDecommission");
+    int numDatanodes = 6;
+    startCluster(numDatanodes, conf);
     try {
+      DFSClient client = getDfsClient(cluster, conf);
+      FileSystem fileSys = cluster.getFileSystem();
+      FSNamesystem fsn = cluster.getNamesystem();
+      ArrayList<String> decommissionedNodes = new ArrayList<String>(numDatanodes);
       for (int iteration = 0; iteration < numDatanodes - 1; iteration++) {
         int replicas = numDatanodes - iteration - 1;
-        //
+        
         // Decommission one node. Verify that node is decommissioned.
-        // 
-        Path file1 = new Path("decommission.dat");
+        Path file1 = new Path("testDecommission.dat");
         writeFile(fileSys, file1, replicas);
-        System.out.println("Created file decommission.dat with " +
-                           replicas + " replicas.");
-        checkFile(fileSys, file1, replicas);
+        LOG.info("Created file decommission.dat with " + replicas
+            + " replicas.");
         printFileLocations(fileSys, file1);
-        String downnode = decommissionNode(cluster.getNamesystem(), conf,
-                                           client, localFileSys);
-        decommissionedNodes.add(downnode);
-        waitNodeState(fileSys, downnode, NodeState.DECOMMISSIONED);
-        checkFile(fileSys, file1, replicas, downnode);
+        DatanodeInfo downnode = decommissionNode(fsn, decommissionedNodes,
+            AdminStates.DECOMMISSIONED);
+        decommissionedNodes.add(downnode.getName());
+        
+        // Ensure decommissioned datanode is not automatically shutdown
+        assertEquals("All datanodes must be alive", numDatanodes, 
+            client.datanodeReport(DatanodeReportType.LIVE).length);
+        
+        checkFile(fileSys, file1, replicas, downnode.getName(), numDatanodes);
         cleanupFile(fileSys, file1);
-        cleanupFile(localFileSys, dir);
       }
+      
+      // Restart the cluster and ensure decommissioned datanodes
+      // are allowed to register with the namenode
+      cluster.shutdown();
+      startCluster(numDatanodes, conf);
     } catch (IOException e) {
-      info = client.datanodeReport(DatanodeReportType.ALL);
+      DFSClient client = getDfsClient(cluster, conf);
+      DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.ALL);
       printDatanodeReport(info);
       throw e;
-    } finally {
-      fileSys.close();
-      cluster.shutdown();
     }
   }
+  
+  /**
+   * Tests cluster storage statistics during decommissioning
+   */
+  @Test
+  public void testClusterStats() throws IOException, InterruptedException {
+    LOG.info("Starting test testClusterStats");
+    int numDatanodes = 1;
+    startCluster(numDatanodes, conf);
+    
+    FileSystem fileSys = cluster.getFileSystem();
+    Path file = new Path("testClusterStats.dat");
+    writeFile(fileSys, file, 1);
+    
+    FSNamesystem fsn = cluster.getNamesystem();
+    NameNode namenode = cluster.getNameNode();
+    ArrayList<String> decommissionedNodes = new ArrayList<String>(numDatanodes);
+    DatanodeInfo downnode = decommissionNode(fsn, decommissionedNodes,
+        AdminStates.DECOMMISSION_INPROGRESS);
+    // Check namenode stats for multiple datanode heartbeats
+    verifyStats(namenode, fsn, downnode, true);
+    
+    // Stop decommissioning and verify stats
+    writeConfigFile(excludeFile, null);
+    fsn.refreshNodes(conf);
+    DatanodeInfo ret = fsn.getDatanode(downnode);
+    waitNodeState(ret, AdminStates.NORMAL);
+    verifyStats(namenode, fsn, ret, false);
+  }
+  
+  /**
+   * Test host file or include file functionality. Only datanodes
+   * in the include file are allowed to connect to the namenode.
+   */
+  @Test
+  public void testHostsFile() throws IOException, InterruptedException {
+    conf.set("dfs.hosts", hostsFile.toUri().getPath());
+    int numDatanodes = 1;
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDatanodes)
+        .setupHostsFile(true).build();
+    cluster.waitActive();
+    
+    // Now empty hosts file and ensure the datanode is disallowed
+    // from talking to namenode, resulting in it's shutdown.
+    ArrayList<String>list = new ArrayList<String>();
+    list.add("invalidhost");
+    writeConfigFile(hostsFile, list);
+    cluster.getNamesystem().refreshNodes(conf);
+    
+    DFSClient client = getDfsClient(cluster, conf);
+    DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.LIVE);
+    for (int i = 0 ; i < 5 && info.length != 0; i++) {
+      LOG.info("Waiting for datanode to be marked dead");
+      Thread.sleep(HEARTBEAT_INTERVAL * 1000);
+      info = client.datanodeReport(DatanodeReportType.LIVE);
+    }
+    assertEquals("Number of live nodes should be 0", 0, info.length);
+  }
 }