You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by su...@apache.org on 2009/11/24 16:07:50 UTC

svn commit: r883723 - in /hadoop/hdfs/trunk: ./ src/java/org/apache/hadoop/hdfs/server/namenode/ src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/

Author: suresh
Date: Tue Nov 24 15:07:49 2009
New Revision: 883723

URL: http://svn.apache.org/viewvc?rev=883723&view=rev
Log:
HDFS-758. Add decommissioning status page to Namenode Web UI. Contributed by Jitendra Nath Pandey.

Added:
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java
Modified:
    hadoop/hdfs/trunk/CHANGES.txt
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java

Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=883723&r1=883722&r2=883723&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Tue Nov 24 15:07:49 2009
@@ -182,6 +182,9 @@
     HDFS-702. Add HDFS implementation of AbstractFileSystem. 
     (Sanjay Radio via suresh)
 
+    HDFS-758. Add decommissioning status page to Namenode Web UI.
+    (Jitendra Nath Pandey via suresh)
+
   IMPROVEMENTS
 
     HDFS-381. Remove blocks from DataNode maps when corresponding file
@@ -534,7 +537,7 @@
 
     HDFS-737. Add full path name of the file to the block information and 
     summary of total number of files, blocks, live and deadnodes to 
-    metasave output. (Jitendra Pandey via suresh)
+    metasave output. (Jitendra Nath Pandey via suresh)
     
   BUG FIXES
 

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java?rev=883723&r1=883722&r2=883723&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java Tue Nov 24 15:07:49 2009
@@ -41,6 +41,7 @@
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.NumberReplicas;
 import org.apache.hadoop.hdfs.server.namenode.UnderReplicatedBlocks.BlockIterator;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.mortbay.log.Log;
 
 /**
  * Keeps information related to the blocks stored in the Hadoop cluster.
@@ -1407,12 +1408,38 @@
     return new NumberReplicas(live, count, corrupt, excess);
   }
 
+  private void logBlockReplicationInfo(Block block, DatanodeDescriptor srcNode,
+      NumberReplicas num) {
+    int curReplicas = num.liveReplicas();
+    int curExpectedReplicas = getReplication(block);
+    INode fileINode = blocksMap.getINode(block);
+    Iterator<DatanodeDescriptor> nodeIter = blocksMap.nodeIterator(block);
+    StringBuffer nodeList = new StringBuffer();
+    while (nodeIter.hasNext()) {
+      DatanodeDescriptor node = nodeIter.next();
+      nodeList.append(node.name);
+      nodeList.append(" ");
+    }
+    FSNamesystem.LOG.info("Block: " + block + ", Expected Replicas: "
+        + curExpectedReplicas + ", live replicas: " + curReplicas
+        + ", corrupt replicas: " + num.corruptReplicas()
+        + ", decommissioned replicas: " + num.decommissionedReplicas()
+        + ", excess replicas: " + num.excessReplicas()
+        + ", Is Open File: " + fileINode.isUnderConstruction()
+        + ", Datanodes having this block: " + nodeList + ", Current Datanode: "
+        + srcNode.name + ", Is current datanode decommissioning: "
+        + srcNode.isDecommissionInProgress());
+  }
+  
   /**
    * Return true if there are any blocks on this node that have not
    * yet reached their replication factor. Otherwise returns false.
    */
   boolean isReplicationInProgress(DatanodeDescriptor srcNode) {
     boolean status = false;
+    int underReplicatedBlocks = 0;
+    int decommissionOnlyReplicas = 0;
+    int underReplicatedInOpenFiles = 0;
     final Iterator<? extends Block> it = srcNode.getBlockIterator();
     while(it.hasNext()) {
       final Block block = it.next();
@@ -1424,15 +1451,25 @@
         int curExpectedReplicas = getReplication(block);
         if (isNeededReplication(block, curExpectedReplicas, curReplicas)) {
           if (curExpectedReplicas > curReplicas) {
-            //Set to true only if strictly under-replicated
-            status = true;
+            //Log info about one block for this node which needs replication
+            if (!status) {
+              status = true;
+              logBlockReplicationInfo(block, srcNode, num);
+            }
+            underReplicatedBlocks++;
+            if ((curReplicas == 0) && (num.decommissionedReplicas() > 0)) {
+              decommissionOnlyReplicas++;
+            }
+            if (fileINode.isUnderConstruction()) {
+              underReplicatedInOpenFiles++;
+            }
           }
           if (!neededReplications.contains(block) &&
             pendingReplications.getNumReplicas(block) == 0) {
             //
             // These blocks have been reported from the datanode
             // after the startDecommission method has been executed. These
-            // blocks were in flight when the decommission was started.
+            // blocks were in flight when the decommissioning was started.
             //
             neededReplications.add(block,
                                    curReplicas,
@@ -1442,6 +1479,9 @@
         }
       }
     }
+    srcNode.decommissioningStatus.set(underReplicatedBlocks,
+        decommissionOnlyReplicas, 
+        underReplicatedInOpenFiles);
     return status;
   }
 

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java?rev=883723&r1=883722&r2=883723&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java Tue Nov 24 15:07:49 2009
@@ -47,6 +47,11 @@
 
  **************************************************/
 public class DatanodeDescriptor extends DatanodeInfo {
+  
+  // Stores status of decommissioning.
+  // If node is not decommissioning, do not use this object for anything.
+  DecommissioningStatus decommissioningStatus = new DecommissioningStatus();
+  
   /** Block and targets pair */
   public static class BlockTargetPair {
     public final Block block;
@@ -589,4 +594,53 @@
     // by DatanodeID
     return (this == obj) || super.equals(obj);
   }
+  
+  class DecommissioningStatus {
+    int underReplicatedBlocks;
+    int decommissionOnlyReplicas;
+    int underReplicatedInOpenFiles;
+    long startTime;
+    
+    synchronized void set(int underRep,
+        int onlyRep, int underConstruction) {
+      if (isDecommissionInProgress() == false) {
+        return;
+      }
+      underReplicatedBlocks = underRep;
+      decommissionOnlyReplicas = onlyRep;
+      underReplicatedInOpenFiles = underConstruction;
+    }
+    
+    synchronized int getUnderReplicatedBlocks() {
+      if (isDecommissionInProgress() == false) {
+        return 0;
+      }
+      return underReplicatedBlocks;
+    }
+    synchronized int getDecommissionOnlyReplicas() {
+      if (isDecommissionInProgress() == false) {
+        return 0;
+      }
+      return decommissionOnlyReplicas;
+    }
+
+    synchronized int getUnderReplicatedInOpenFiles() {
+      if (isDecommissionInProgress() == false) {
+        return 0;
+      }
+      return underReplicatedInOpenFiles;
+    }
+
+    synchronized void setStartTime(long time) {
+      startTime = time;
+    }
+    
+    synchronized long getStartTime() {
+      if (isDecommissionInProgress() == false) {
+        return 0;
+      }
+      return startTime;
+    }
+  }  // End of class DecommissioningStatus
+  
 }

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=883723&r1=883722&r2=883723&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Tue Nov 24 15:07:49 2009
@@ -3094,14 +3094,11 @@
     if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
       LOG.info("Start Decommissioning node " + node.getName());
       node.startDecommission();
+      node.decommissioningStatus.setStartTime(now());
       //
       // all the blocks that reside on this node have to be 
       // replicated.
-      Iterator<? extends Block> decommissionBlocks = node.getBlockIterator();
-      while(decommissionBlocks.hasNext()) {
-        Block block = decommissionBlocks.next();
-        blockManager.updateNeededReplications(block, -1, 0);
-      }
+      checkDecommissionStateInternal(node);
     }
   }
 
@@ -4320,4 +4317,17 @@
                                                   startingBlockId);
   }
 
+  public synchronized ArrayList<DatanodeDescriptor> getDecommissioningNodes() {
+    ArrayList<DatanodeDescriptor> decommissioningNodes = 
+        new ArrayList<DatanodeDescriptor>();
+    ArrayList<DatanodeDescriptor> results = 
+        getDatanodeListForReport(DatanodeReportType.LIVE);
+    for (Iterator<DatanodeDescriptor> it = results.iterator(); it.hasNext();) {
+      DatanodeDescriptor node = it.next();
+      if (node.isDecommissionInProgress()) {
+        decommissioningNodes.add(node);
+      }
+    }
+    return decommissioningNodes;
+  }  
 }

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java?rev=883723&r1=883722&r2=883723&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java Tue Nov 24 15:07:49 2009
@@ -165,6 +165,9 @@
       ArrayList<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>();
       fsn.DFSNodesStatus(live, dead);
 
+      ArrayList<DatanodeDescriptor> decommissioning = fsn
+          .getDecommissioningNodes();
+
       sorterField = request.getParameter("sorter/field");
       sorterOrder = request.getParameter("sorter/order");
       if (sorterField == null)
@@ -217,7 +220,14 @@
           + "<a href=\"dfsnodelist.jsp?whatNodes=LIVE\">Live Nodes</a> "
           + colTxt() + ":" + colTxt() + live.size() + rowTxt() + colTxt()
           + "<a href=\"dfsnodelist.jsp?whatNodes=DEAD\">Dead Nodes</a> "
-          + colTxt() + ":" + colTxt() + dead.size() + "</table></div><br>\n");
+          + colTxt() + ":" + colTxt() + dead.size() + rowTxt() + colTxt()
+          + "<a href=\"dfsnodelist.jsp?whatNodes=DECOMMISSIONING\">"
+          + "Decommissioning Nodes</a> "
+          + colTxt() + ":" + colTxt() + decommissioning.size() 
+          + rowTxt() + colTxt()
+          + "Number of Under-Replicated Blocks" + colTxt() + ":" + colTxt()
+          + fsn.getUnderReplicatedBlocks()
+          + "</table></div><br>\n");
 
       if (live.isEmpty() && dead.isEmpty()) {
         out.print("There are no datanodes in the cluster");
@@ -282,6 +292,44 @@
       return ret;
     }
 
+    void generateDecommissioningNodeData(JspWriter out, DatanodeDescriptor d,
+        String suffix, boolean alive, int nnHttpPort) throws IOException {
+      String url = "http://" + d.getHostName() + ":" + d.getInfoPort()
+          + "/browseDirectory.jsp?namenodeInfoPort=" + nnHttpPort + "&dir="
+          + URLEncoder.encode("/", "UTF-8");
+
+      String name = d.getHostName() + ":" + d.getPort();
+      if (!name.matches("\\d+\\.\\d+.\\d+\\.\\d+.*"))
+        name = name.replaceAll("\\.[^.:]*", "");
+      int idx = (suffix != null && name.endsWith(suffix)) ? name
+          .indexOf(suffix) : -1;
+
+      out.print(rowTxt() + "<td class=\"name\"><a title=\"" + d.getHost() + ":"
+          + d.getPort() + "\" href=\"" + url + "\">"
+          + ((idx > 0) ? name.substring(0, idx) : name) + "</a>"
+          + ((alive) ? "" : "\n"));
+      if (!alive) {
+        return;
+      }
+
+      long decommRequestTime = d.decommissioningStatus.getStartTime();
+      long timestamp = d.getLastUpdate();
+      long currentTime = System.currentTimeMillis();
+      long hoursSinceDecommStarted = (currentTime - decommRequestTime)/3600000;
+      long remainderMinutes = ((currentTime - decommRequestTime)/60000) % 60;
+      out.print("<td class=\"lastcontact\"> "
+          + ((currentTime - timestamp) / 1000)
+          + "<td class=\"underreplicatedblocks\">"
+          + d.decommissioningStatus.getUnderReplicatedBlocks()
+          + "<td class=\"blockswithonlydecommissioningreplicas\">"
+          + d.decommissioningStatus.getDecommissionOnlyReplicas() 
+          + "<td class=\"underrepblocksinfilesunderconstruction\">"
+          + d.decommissioningStatus.getUnderReplicatedInOpenFiles()
+          + "<td class=\"timesincedecommissionrequest\">"
+          + hoursSinceDecommStarted + " hrs " + remainderMinutes + " mins"
+          + "\n");
+    }
+    
     void generateNodeData(JspWriter out, DatanodeDescriptor d,
         String suffix, boolean alive, int nnHttpPort) throws IOException {
       /*
@@ -432,7 +480,7 @@
             }
           }
           out.print("</table>\n");
-        } else {
+        } else if (whatNodes.equals("DEAD")) {
 
           out.print("<br> <a name=\"DeadNodes\" id=\"title\"> "
               + " Dead Datanodes : " + dead.size() + "</a><br><br>\n");
@@ -448,6 +496,35 @@
 
             out.print("</table>\n");
           }
+        } else if (whatNodes.equals("DECOMMISSIONING")) {
+          // Decommissioning Nodes
+          ArrayList<DatanodeDescriptor> decommissioning = nn.getNamesystem()
+              .getDecommissioningNodes();
+          out.print("<br> <a name=\"DecommissioningNodes\" id=\"title\"> "
+              + " Decommissioning Datanodes : " + decommissioning.size()
+              + "</a><br><br>\n");
+          if (decommissioning.size() > 0) {
+            out.print("<table border=1 cellspacing=0> <tr class=\"headRow\"> "
+                + "<th " + nodeHeaderStr("name") 
+                + "> Node <th " + nodeHeaderStr("lastcontact")
+                + "> Last <br>Contact <th "
+                + nodeHeaderStr("underreplicatedblocks")
+                + "> Under Replicated Blocks <th "
+                + nodeHeaderStr("blockswithonlydecommissioningreplicas")
+                + "> Blocks With No <br> Live Replicas <th "
+                + nodeHeaderStr("underrepblocksinfilesunderconstruction")
+                + "> Under Replicated Blocks <br> In Files Under Construction" 
+                + " <th " + nodeHeaderStr("timesincedecommissionrequest")
+                + "> Time Since Decommissioning Started"
+                );
+
+            JspHelper.sortNodeList(decommissioning, "name", "ASC");
+            for (int i = 0; i < decommissioning.size(); i++) {
+              generateDecommissioningNodeData(out, decommissioning.get(i),
+                  port_suffix, true, nnHttpPort);
+            }
+            out.print("</table>\n");
+          }
         }
         out.print("</div>");
       }
@@ -644,4 +721,4 @@
       doc.getWriter().flush();
     }
   }    
-}
\ No newline at end of file
+}

Added: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java?rev=883723&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java (added)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java Tue Nov 24 15:07:49 2009
@@ -0,0 +1,233 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Random;
+
+import org.junit.BeforeClass;
+import org.junit.AfterClass;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.DatanodeDescriptor;
+
+/**
+ * This class tests the decommissioning of nodes.
+ */
+public class TestDecommissioningStatus {
+  private static final long seed = 0xDEADBEEFL;
+  private static final int blockSize = 8192;
+  private static final int fileSize = 16384;
+  private static final int numDatanodes = 2;
+  private static MiniDFSCluster cluster;
+  private static FileSystem fileSys;
+  private static Path excludeFile;
+  private static FileSystem localFileSys;
+  private static Configuration conf;
+  private static Path dir;
+
+  ArrayList<String> decommissionedNodes = new ArrayList<String>(numDatanodes);
+  
+  @BeforeClass
+  public static void setUp() throws Exception {
+    conf = new HdfsConfiguration();
+    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY,
+        false);
+
+    // Set up the hosts/exclude files.
+    localFileSys = FileSystem.getLocal(conf);
+    Path workingDir = localFileSys.getWorkingDirectory();
+    dir = new Path(workingDir, "build/test/data/work-dir/decommission");
+    assertTrue(localFileSys.mkdirs(dir));
+    excludeFile = new Path(dir, "exclude");
+    conf.set("dfs.hosts.exclude", excludeFile.toUri().getPath());
+    conf
+        .setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 2000);
+    conf.setInt("dfs.heartbeat.interval", 1);
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY,
+        4);
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1000);
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY, 1);
+    writeConfigFile(localFileSys, excludeFile, null);
+
+    cluster = new MiniDFSCluster(conf, numDatanodes, true, null);
+    cluster.waitActive();
+    fileSys = cluster.getFileSystem();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    if(fileSys != null) fileSys.close();
+    if(cluster != null) cluster.shutdown();
+  }
+
+  private static void writeConfigFile(FileSystem fs, Path name,
+      ArrayList<String> nodes) throws IOException {
+
+    // delete if it already exists
+    if (fs.exists(name)) {
+      fs.delete(name, true);
+    }
+
+    FSDataOutputStream stm = fs.create(name);
+
+    if (nodes != null) {
+      for (Iterator<String> it = nodes.iterator(); it.hasNext();) {
+        String node = it.next();
+        stm.writeBytes(node);
+        stm.writeBytes("\n");
+      }
+    }
+    stm.close();
+  }
+
+  private void writeFile(FileSystem fileSys, Path name, short repl)
+      throws IOException {
+    // create and write a file that contains three blocks of data
+    FSDataOutputStream stm = fileSys.create(name, true, fileSys.getConf()
+        .getInt("io.file.buffer.size", 4096), repl, (long) blockSize);
+    byte[] buffer = new byte[fileSize];
+    Random rand = new Random(seed);
+    rand.nextBytes(buffer);
+    stm.write(buffer);
+    stm.close();
+  }
+ 
+  private FSDataOutputStream writeIncompleteFile(FileSystem fileSys, Path name,
+      short repl) throws IOException {
+    // create and write a file that contains three blocks of data
+    FSDataOutputStream stm = fileSys.create(name, true, fileSys.getConf()
+        .getInt("io.file.buffer.size", 4096), repl, (long) blockSize);
+    byte[] buffer = new byte[fileSize];
+    Random rand = new Random(seed);
+    rand.nextBytes(buffer);
+    stm.write(buffer);
+    // Do not close stream, return it
+    // so that it is not garbage collected
+    return stm;
+  }
+  
+  private void cleanupFile(FileSystem fileSys, Path name) throws IOException {
+    assertTrue(fileSys.exists(name));
+    fileSys.delete(name, true);
+    assertTrue(!fileSys.exists(name));
+  }
+
+  /*
+   * Decommissions the node at the given index
+   */
+  private String decommissionNode(FSNamesystem namesystem, Configuration conf,
+      DFSClient client, FileSystem localFileSys, int nodeIndex)
+      throws IOException {
+    DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.LIVE);
+
+    String nodename = info[nodeIndex].getName();
+    System.out.println("Decommissioning node: " + nodename);
+
+    // write nodename into the exclude file.
+    ArrayList<String> nodes = new ArrayList<String>(decommissionedNodes);
+    nodes.add(nodename);
+    writeConfigFile(localFileSys, excludeFile, nodes);
+    namesystem.refreshNodes(conf);
+    return nodename;
+  }
+
+  private void checkDecommissionStatus(DatanodeDescriptor decommNode,
+      int expectedUnderRep, int expectedDecommissionOnly,
+      int expectedUnderRepInOpenFiles) {
+    assertEquals(decommNode.decommissioningStatus.getUnderReplicatedBlocks(),
+        expectedUnderRep);
+    assertEquals(
+        decommNode.decommissioningStatus.getDecommissionOnlyReplicas(),
+        expectedDecommissionOnly);
+    assertEquals(decommNode.decommissioningStatus
+        .getUnderReplicatedInOpenFiles(), expectedUnderRepInOpenFiles);
+  }
+  
+  /**
+   * Tests Decommissioning Status in DFS.
+   */
+
+  @Test
+  public void testDecommissionStatus() throws IOException, InterruptedException {
+    InetSocketAddress addr = new InetSocketAddress("localhost", cluster
+        .getNameNodePort());
+    DFSClient client = new DFSClient(addr, conf);
+    DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.LIVE);
+    assertEquals("Number of Datanodes ", 2, info.length);
+    FileSystem fileSys = cluster.getFileSystem();
+
+    short replicas = 2;
+    //
+    // Decommission one node. Verify the decommission status
+    // 
+    Path file1 = new Path("decommission.dat");
+    writeFile(fileSys, file1, replicas);
+
+    Path file2 = new Path("decommission1.dat");
+    FSDataOutputStream st1 = writeIncompleteFile(fileSys, file2, replicas);
+    Thread.sleep(5000);
+
+    FSNamesystem fsn = cluster.getNamesystem();
+    for (int iteration = 0; iteration < numDatanodes; iteration++) {
+      String downnode = decommissionNode(fsn, conf, client, localFileSys,
+          iteration);
+      decommissionedNodes.add(downnode);
+      Thread.sleep(5000);
+      ArrayList<DatanodeDescriptor> decommissioningNodes = fsn
+          .getDecommissioningNodes();
+      if (iteration == 0) {
+        assertEquals(decommissioningNodes.size(), 1);
+        DatanodeDescriptor decommNode = decommissioningNodes.get(0);
+        checkDecommissionStatus(decommNode, 4, 0, 2);
+      } else {
+        assertEquals(decommissioningNodes.size(), 2);
+        DatanodeDescriptor decommNode1 = decommissioningNodes.get(0);
+        DatanodeDescriptor decommNode2 = decommissioningNodes.get(1);
+        checkDecommissionStatus(decommNode1, 4, 4, 2);
+        checkDecommissionStatus(decommNode2, 4, 4, 2);
+      }
+    }
+    // Call refreshNodes on FSNamesystem with empty exclude file.
+    // This will remove the datanodes from decommissioning list and
+    // make them available again.
+    writeConfigFile(localFileSys, excludeFile, null);
+    fsn.refreshNodes(conf);
+    st1.close();
+    cleanupFile(fileSys, file1);
+    cleanupFile(fileSys, file2);
+    cleanupFile(localFileSys, dir);
+  }
+}