You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2007/01/04 20:14:42 UTC

svn commit: r492695 - in /lucene/hadoop/trunk: ./ src/java/org/apache/hadoop/dfs/ src/test/org/apache/hadoop/dfs/ src/webapps/dfs/ src/webapps/static/

Author: cutting
Date: Thu Jan  4 11:14:39 2007
New Revision: 492695

URL: http://svn.apache.org/viewvc?view=rev&rev=492695
Log:
HADOOP-681.  Add to HDFS the ability to decommission nodes.  Contributed by Dhruba.

Added:
    lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDecommission.java
Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSAdmin.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeInfo.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSImage.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java
    lucene/hadoop/trunk/src/webapps/dfs/dfshealth.jsp
    lucene/hadoop/trunk/src/webapps/static/hadoop.css

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=492695&r1=492694&r2=492695
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Thu Jan  4 11:14:39 2007
@@ -188,6 +188,10 @@
 53. HADOOP-840.  In task tracker, queue task cleanups and perform them
     in a separate thread.  (omalley & Mahadev Konar via cutting)
 
+54. HADOOP-681.  Add to HDFS the ability to decommission nodes.  This
+    causes their blocks to be re-replicated on other nodes, so that
+    they may be removed from a cluster.  (Dhruba Borthakur via cutting)
+
 
 Release 0.9.2 - 2006-12-15
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java?view=diff&rev=492695&r1=492694&r2=492695
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java Thu Jan  4 11:14:39 2007
@@ -29,7 +29,7 @@
  **********************************************************************/
 interface ClientProtocol extends VersionedProtocol {
 
-  public static final long versionID = 3L;  // setSafeMode() added
+  public static final long versionID = 4L; // decommission node added
   
     ///////////////////////////////////////
     // File contents
@@ -300,4 +300,6 @@
      * @author Konstantin Shvachko
      */
     public boolean setSafeMode( FSConstants.SafeModeAction action ) throws IOException;
+
+    public boolean decommission( FSConstants.DecommissionAction action, String[] nodenames) throws IOException;
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSAdmin.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSAdmin.java?view=diff&rev=492695&r1=492694&r2=492695
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSAdmin.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSAdmin.java Thu Jan  4 11:14:39 2007
@@ -86,8 +86,6 @@
      * @exception IOException if the filesystem does not exist.
      */
     public void setSafeMode(String[] argv, int idx) throws IOException {
-      final String safeModeUsage = "Usage: java DFSAdmin -safemode "
-                                   + "[enter | leave | get]";
       if (!(fs instanceof DistributedFileSystem)) {
         System.out.println("FileSystem is " + fs.getName());
         return;
@@ -134,6 +132,65 @@
     }
 
     /**
+     * Command related to decommission of a datanode.
+     * Usage: java DFSAdmin -decommission [enter | leave | get]
+     * @param argv List of command line parameters. Each of these items
+              could be a hostname or a hostname:portname.
+     * @param idx The index of the command that is being processed.
+     * @exception IOException if the filesystem does not exist.
+     * @return 0 on success, non zero on error.
+     */
+    public int decommission(String[] argv, int idx) throws IOException {
+      int exitCode = -1;
+
+      if (!(fs instanceof DistributedFileSystem)) {
+        System.out.println("FileSystem is " + fs.getName());
+        return exitCode;
+      }
+      if (idx >= argv.length - 1) {
+        printUsage("-decommission");
+        return exitCode;
+      }
+      
+      //
+      // Copy all the datanode names to nodes[]
+      //
+      String[] nodes = new String[argv.length - idx - 1];
+      for (int i = idx + 1, j = 0; i < argv.length; i++, j++) {
+        nodes[j] = argv[i];
+      }
+
+      FSConstants.DecommissionAction action;
+
+      if ("set".equalsIgnoreCase(argv[idx])) {
+        action = FSConstants.DecommissionAction.DECOMMISSION_SET;
+      } else if ("clear".equalsIgnoreCase(argv[idx])) {
+        action = FSConstants.DecommissionAction.DECOMMISSION_CLEAR;
+      } else if ("get".equalsIgnoreCase(argv[idx])) {
+        action = FSConstants.DecommissionAction.DECOMMISSION_GET;
+      } else {
+        printUsage("-decommission");
+        return exitCode;
+      }
+      DistributedFileSystem dfs = (DistributedFileSystem) fs;
+      boolean mode = dfs.decommission(action, nodes);
+
+      if (action == FSConstants.DecommissionAction.DECOMMISSION_GET) {
+        if (mode) {
+          System.out.println("Node(s) has finished decommission");
+        }
+        else {
+          System.out.println("Node(s) have not yet been decommissioned");
+        }
+        return 0;
+      }
+      if (mode) {
+        return 0; // success
+      }
+      return exitCode;
+    }
+
+    /**
      * Displays format of commands.
      * @param cmd The command that is being executed.
      */
@@ -144,10 +201,15 @@
           } else if ("-safemode".equals(cmd)) {
             System.err.println("Usage: java DFSAdmin"
                 + " [-safemode enter | leave | get | wait]");
+          } else if ("-decommission".equals(cmd)) {
+            System.err.println("Usage: java DFSAdmin"
+                + " [-decommission set | clear | get "
+                + "[datanode1[, datanode2..]]");
           } else {
             System.err.println("Usage: java DFSAdmin");
             System.err.println("           [-report]");
             System.err.println("           [-safemode enter | leave | get | wait]");
+            System.err.println("           [-decommission set | clear | get]");
           }
     }
 
@@ -180,6 +242,11 @@
                   printUsage(cmd);
                   return exitCode;
                 }
+        } else if ("-decommission".equals(cmd)) {
+                if (argv.length < 2) {
+                  printUsage(cmd);
+                  return exitCode;
+                }
         }
 
         // initialize DFSAdmin
@@ -200,6 +267,8 @@
                 report();
             } else if ("-safemode".equals(cmd)) {
                 setSafeMode(argv, i);
+            } else if ("-decommission".equals(cmd)) {
+                exitCode = decommission(argv, i);
             } else {
                 exitCode = -1;
                 System.err.println(cmd.substring(1) + ": Unknown command");

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java?view=diff&rev=492695&r1=492694&r2=492695
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java Thu Jan  4 11:14:39 2007
@@ -362,6 +362,18 @@
     }
 
     /**
+     * Set, clear decommission state of datnode(s).
+     * See {@link ClientProtocol#decommission(FSConstants.DecommissionAction)} 
+     * for more details.
+     * 
+     * @see ClientProtocol#decommission(FSConstants.DecommissionAction)
+     */
+    public boolean decommission(DecommissionAction action, String[] nodes)
+                                throws IOException {
+      return namenode.decommission(action, nodes);
+    }
+
+    /**
      */
     public boolean mkdirs(UTF8 src) throws IOException {
         checkOpen();
@@ -526,6 +538,14 @@
             }
             this.blocks = newBlocks;
             this.nodes = (DatanodeInfo[][]) nodeV.toArray(new DatanodeInfo[nodeV.size()][]);
+        }
+
+        /**
+         * Used by the automatic tests to detemine blocks locations of a
+         * file
+         */
+        synchronized DatanodeInfo[][] getDataNodes() {
+          return nodes;
         }
 
         /**

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeInfo.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeInfo.java?view=diff&rev=492695&r1=492694&r2=492695
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeInfo.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeInfo.java Thu Jan  4 11:14:39 2007
@@ -27,6 +27,7 @@
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableFactories;
 import org.apache.hadoop.io.WritableFactory;
+import org.apache.hadoop.io.WritableUtils;
 
 /** 
  * DatanodeInfo represents the status of a DataNode.
@@ -42,8 +43,14 @@
   protected long lastUpdate;
   protected int xceiverCount;
 
+  // administrative states of a datanode
+  public enum AdminStates {NORMAL, DECOMMISSION_INPROGRESS, DECOMMISSIONED; }
+  protected AdminStates adminState;
+
+
   DatanodeInfo() {
     super();
+    adminState = null;
   }
   
   DatanodeInfo( DatanodeInfo from ) {
@@ -52,6 +59,7 @@
     this.remaining = from.getRemaining();
     this.lastUpdate = from.getLastUpdate();
     this.xceiverCount = from.getXceiverCount();
+    this.adminState = from.adminState;
   }
 
   DatanodeInfo( DatanodeID nodeID ) {
@@ -60,6 +68,7 @@
     this.remaining = 0L;
     this.lastUpdate = 0L;
     this.xceiverCount = 0;
+    this.adminState = null;
   }
   
   /** The raw capacity. */
@@ -101,6 +110,13 @@
     long r = getRemaining();
     long u = c - r;
     buffer.append("Name: "+name+"\n");
+    if (isDecommissioned()) {
+      buffer.append("State          : Decommissioned\n");
+    } else if (isDecommissionInProgress()) {
+      buffer.append("State          : Decommission in progress\n");
+    } else {
+      buffer.append("State          : In Service\n");
+    }
     buffer.append("Total raw bytes: "+c+" ("+FsShell.byteDesc(c)+")"+"\n");
     buffer.append("Used raw bytes: "+u+" ("+FsShell.byteDesc(u)+")"+"\n");
     buffer.append("% used: "+FsShell.limitDecimal(((1.0*u)/c)*100,2)+"%"+"\n");
@@ -108,6 +124,72 @@
     return buffer.toString();
   }
 
+  /**
+   * Start decommissioning a node.
+   * old state.
+   */
+  void startDecommission() {
+    adminState = AdminStates.DECOMMISSION_INPROGRESS;
+  }
+
+  /**
+   * Stop decommissioning a node.
+   * old state.
+   */
+  void stopDecommission() {
+    adminState = null;
+  }
+
+  /**
+   * Returns true if the node is in the process of being decommissioned
+   */
+   boolean isDecommissionInProgress() {
+     if (adminState == AdminStates.DECOMMISSION_INPROGRESS) {
+       return true;
+     }
+     return false;
+   }
+
+  /**
+   * Returns true if the node has been decommissioned.
+   */
+   boolean isDecommissioned() {
+     if (adminState == AdminStates.DECOMMISSIONED) {
+       return true;
+     }
+     return false;
+   }
+
+  /**
+   * Sets the admin state to indicate that decommision is complete.
+   */
+   void setDecommissioned() {
+     assert isDecommissionInProgress();
+     adminState = AdminStates.DECOMMISSIONED;
+   }
+
+   /**
+    * Retrieves the admin state of this node.
+    */
+    AdminStates getAdminState() {
+      if (adminState == null) {
+        return AdminStates.NORMAL;
+      }
+      return adminState;
+    }
+
+   /**
+    * Sets the admin state of this node.
+    */
+    void setAdminState(AdminStates newState) {
+      if (newState == AdminStates.NORMAL) {
+        adminState = null;
+      }
+      else {
+        adminState = newState;
+      }
+    }
+
   /////////////////////////////////////////////////
   // Writable
   /////////////////////////////////////////////////
@@ -127,6 +209,7 @@
     out.writeLong(remaining);
     out.writeLong(lastUpdate);
     out.writeInt(xceiverCount);
+    WritableUtils.writeEnum(out, getAdminState());
   }
 
   /**
@@ -137,5 +220,8 @@
     this.remaining = in.readLong();
     this.lastUpdate = in.readLong();
     this.xceiverCount = in.readInt();
+    AdminStates newState = (AdminStates) WritableUtils.readEnum(in,
+                                         AdminStates.class);
+    setAdminState(newState);
   }
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java?view=diff&rev=492695&r1=492694&r2=492695
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java Thu Jan  4 11:14:39 2007
@@ -281,7 +281,7 @@
         return used;
     }
 
-    /** Return statistics for each datanode.*/
+    /** Return statistics for each datanode. */
     public DatanodeInfo[] getDataNodeStats() throws IOException {
       return dfs.datanodeReport();
     }
@@ -294,5 +294,14 @@
     public boolean setSafeMode( FSConstants.SafeModeAction action ) 
     throws IOException {
       return dfs.setSafeMode( action );
+    }
+
+    /**
+     * Set, clear decommission of a set of datanodes.
+     */
+    public boolean decommission(FSConstants.DecommissionAction action,
+                                String[] nodes)
+    throws IOException {
+      return dfs.decommission(action, nodes);
     }
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java?view=diff&rev=492695&r1=492694&r2=492695
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java Thu Jan  4 11:14:39 2007
@@ -122,6 +122,9 @@
     // SafeMode actions
     public enum SafeModeAction{ SAFEMODE_LEAVE, SAFEMODE_ENTER, SAFEMODE_GET; }
 
+    // decommission administrative actions
+    public enum DecommissionAction{ DECOMMISSION_SET, DECOMMISSION_CLEAR, DECOMMISSION_GET; }
+
     // Version is reflected in the dfs image and edit log files.
     // Version is reflected in the data storage file.
     // Versions are negative.

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java?view=diff&rev=492695&r1=492694&r2=492695
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java Thu Jan  4 11:14:39 2007
@@ -258,8 +258,9 @@
             if( logVersion > -3 )
               throw new IOException("Unexpected opcode " + opcode 
                   + " for version " + logVersion );
-            DatanodeDescriptor node = new DatanodeDescriptor();
-            node.readFields(in);
+            FSImage.DatanodeImage nodeimage = new FSImage.DatanodeImage();
+            nodeimage.readFields(in);
+            DatanodeDescriptor node = nodeimage.getDatanodeDescriptor();
             fsNamesys.unprotectedAddDatanode( node );
             break;
           }
@@ -376,7 +377,7 @@
    * registration event.
    */
   void logAddDatanode( DatanodeDescriptor node ) {
-    logEdit( OP_DATANODE_ADD, node, null );
+    logEdit( OP_DATANODE_ADD, new FSImage.DatanodeImage(node), null );
   }
   
   /** 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSImage.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSImage.java?view=diff&rev=492695&r1=492694&r2=492695
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSImage.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSImage.java Thu Jan  4 11:14:39 2007
@@ -74,7 +74,7 @@
     }
     this.editLog = new FSEditLog( edits );
   }
-  
+
   FSEditLog getEditLog() {
     return editLog;
   }
@@ -344,7 +344,7 @@
     }
   }
 
-  class DatanodeImage implements WritableComparable {
+  static class DatanodeImage implements WritableComparable {
 
     /**************************************************
      * DatanodeImage is used to store persistent information

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?view=diff&rev=492695&r1=492694&r2=492695
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Thu Jan  4 11:14:39 2007
@@ -717,7 +717,10 @@
         // the blocks.
         for (int i = 0; i < nrBlocks; i++) {
           SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(pendingBlocks[i]);
-            if (containingNodes.size() < pendingFile.getReplication()) {
+          // filter out containingNodes that are marked for decommission.
+          int numCurrentReplica = countContainingNodes(containingNodes);
+
+            if (numCurrentReplica < pendingFile.getReplication()) {
                    NameNode.stateChangeLog.debug(
                           "DIR* NameSystem.completeFile:"
                         + pendingBlocks[i].getBlockName()+" has only "+containingNodes.size()
@@ -1585,20 +1588,25 @@
             FSDirectory.INode fileINode = dir.getFileByBlock(block);
             if( fileINode == null )  // block does not belong to any file
                 return;
+
+            // filter out containingNodes that are marked for decommission.
+            int numCurrentReplica = countContainingNodes(containingNodes);
+
             // check whether safe replication is reached for the block
             // only if it is a part of a files
-            incrementSafeBlockCount( containingNodes.size() );
+            incrementSafeBlockCount( numCurrentReplica );
             short fileReplication = fileINode.getReplication();
-            if (containingNodes.size() >= fileReplication ) {
+            if (numCurrentReplica >= fileReplication ) {
                 neededReplications.remove(block);
                 pendingReplications.remove(block);
                 NameNode.stateChangeLog.trace("BLOCK* NameSystem.addStoredBlock: "
-                        +block.getBlockName()+" has "+containingNodes.size()
+                        +block.getBlockName()+" has "+ numCurrentReplica
                         +" replicas so is removed from neededReplications and pendingReplications" );
-            } else {// containingNodes.size() < fileReplication
+
+            } else {// numCurrentReplica < fileReplication
                 neededReplications.add(block);
                 NameNode.stateChangeLog.debug("BLOCK* NameSystem.addStoredBlock: "
-                    +block.getBlockName()+" has only "+containingNodes.size()
+                    +block.getBlockName()+" has only "+ numCurrentReplica
                     +" replicas so is added to neededReplications" );
             }
 
@@ -1620,7 +1628,9 @@
           DatanodeDescriptor cur = it.next();
           Collection<Block> excessBlocks = excessReplicateMap.get(cur.getStorageID());
           if (excessBlocks == null || ! excessBlocks.contains(block)) {
+            if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) {
               nonExcess.add(cur);
+            }
           }
       }
       chooseExcessReplicates(nonExcess, block, replication);    
@@ -1811,6 +1821,145 @@
           }
       }
     }
+
+    /**
+     * Start decommissioning the specified datanodes. If a datanode is
+     * already being decommissioned, then this is a no-op.
+     */
+    public synchronized void startDecommission (String[] nodes) 
+                             throws IOException {
+      if (isInSafeMode()) {
+        throw new SafeModeException("Cannot decommission node ", safeMode);
+      }
+      boolean isError = false;
+      String badnodes = "";
+
+      synchronized (datanodeMap) {
+        for (int i = 0; i < nodes.length; i++) {
+          boolean found = false;
+          for (Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator();
+               it.hasNext(); ) {
+            DatanodeDescriptor node = it.next();
+
+            //
+            // If this is a node that we are interested in, set its admin state.
+            //
+            if (node.getName().equals(nodes[i]) || 
+                node.getHost().equals(nodes[i])) {
+              found = true;
+              if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
+                LOG.info("Start Decommissioning node " + node.name);
+                node.startDecommission();
+                //
+                // all those blocks that resides on this node has to be 
+                // replicated.
+                Block decommissionBlocks[] = node.getBlocks();
+                for (int j = 0; j < decommissionBlocks.length; j++) {
+                  synchronized (neededReplications) {
+                    neededReplications.add(decommissionBlocks[j]);
+                  }
+                }
+              }
+              break;
+            }
+          }
+          //
+          // Record the fact that a specified node was not found
+          //
+          if (!found) {
+            badnodes += nodes[i] + " ";
+            isError = true;
+          }
+        }
+      }
+      if (isError) {
+        throw new IOException("Nodes " + badnodes + " not found");
+      }
+    }
+
+    /**
+     * Stop decommissioning the specified datanodes.
+     */
+    public synchronized void stopDecommission (String[] nodes) 
+                             throws IOException {
+      if (isInSafeMode()) {
+        throw new SafeModeException("Cannot decommission node ", safeMode);
+      }
+      boolean isError = false;
+      String badnodes = "";
+
+      synchronized (datanodeMap) {
+        for (int i = 0; i < nodes.length; i++) {
+          boolean found = false;
+          for (Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator();
+               it.hasNext(); ) {
+            DatanodeDescriptor node = it.next();
+
+            //
+            // If this is a node that we are interested in, set its admin state.
+            //
+            if (node.getName().equals(nodes[i]) || 
+                node.getHost().equals(nodes[i])) {
+              LOG.info("Stop Decommissioning node " + node.name);
+              found = true;
+              node.stopDecommission();
+              break;
+            }
+          }
+          //
+          // Record the fact that a specified node was not found
+          //
+          if (!found) {
+            badnodes += nodes[i] + " ";
+            isError = true;
+          }
+        }
+      }
+      if (isError) {
+        throw new IOException("Nodes " + badnodes + " not found");
+      }
+    }
+
+    /**
+     * Return true if all specified nodes are decommissioned.
+     * Otherwise return false.
+     */
+    public synchronized boolean checkDecommissioned (String[] nodes) 
+                                   throws IOException {
+      String badnodes = "";
+      boolean isError = false;
+
+      synchronized (datanodeMap) {
+        for (int i = 0; i < nodes.length; i++) {
+          boolean found = false;
+          for (Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator();
+               it.hasNext(); ) {
+            DatanodeDescriptor node = it.next();
+
+            //
+            // If this is a node that we are interested in, check its admin state.
+            //
+            if (node.getName().equals(nodes[i]) || 
+                node.getHost().equals(nodes[i])) {
+              found = true;
+              boolean isDecommissioned = checkDecommissionStateInternal(node);
+              if (!isDecommissioned) {
+                return false;
+              }
+            }
+          }
+          if (!found) {
+            badnodes += nodes[i] + " ";
+            isError = true;
+          }
+        }
+      }
+      if (isError) {
+        throw new IOException("Nodes " + badnodes + " not found");
+      }
+      return true;
+    }
+
     /** 
      */
     public DatanodeInfo getDataNodeInfo(String name) {
@@ -1896,6 +2045,72 @@
         return (Block[]) sendBlock.toArray(new Block[sendBlock.size()]);
     }
 
+    /*
+     * Counts the number of nodes in the given list. Skips over nodes
+     * that are marked for decommission.
+     */
+    private int countContainingNodes(Collection<DatanodeDescriptor> nodelist) {
+      int count = 0;
+      for (Iterator<DatanodeDescriptor> it = nodelist.iterator(); 
+           it.hasNext(); ) {
+        DatanodeDescriptor node = it.next();
+        if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
+          count++;
+        }
+      }
+      return count;
+    }
+
+    /*
+     * Return true if there are any blocks in neededReplication that 
+     * reside on the specified node. Otherwise returns false.
+     */
+    private boolean isReplicationInProgress(DatanodeDescriptor srcNode) {
+      synchronized (neededReplications) {
+        for (Iterator<Block> it = neededReplications.iterator(); it.hasNext();){
+          Block block = it.next();
+          Collection<DatanodeDescriptor> containingNodes = blocksMap.get(block); 
+          if (containingNodes.contains(srcNode)) {
+              return true;
+          }
+        }
+      }
+      return false;
+    }
+
+    /**
+     * Change, if appropriate, the admin state of a datanode to 
+     * decommission completed. Return true if decommission is complete.
+     */
+    private boolean checkDecommissionStateInternal(DatanodeDescriptor node) {
+      //
+      // Check to see if there are any blocks in the neededReplication
+      // data structure that has a replica on the node being decommissioned.
+      //
+      if (node.isDecommissionInProgress()) {
+        if (!isReplicationInProgress(node)) {
+          node.setDecommissioned();
+          LOG.info("Decommission complete for node " + node.name);
+        }
+      }
+      if (node.isDecommissioned()) {
+        return true;
+      }
+      return false;
+    }
+
+    /**
+     * Change, if appropriate, the admin state of a datanode to 
+     * decommission completed.
+     */
+    public synchronized void checkDecommissionState(DatanodeID nodeReg) {
+      DatanodeDescriptor node = datanodeMap.get(nodeReg.getStorageID());
+      if (node == null) {
+        return;
+      }
+      checkDecommissionStateInternal(node);
+    }
+
     /**
      * Return with a list of Block/DataNodeInfo sets, indicating
      * where various Blocks should be copied, ASAP.
@@ -1924,6 +2139,7 @@
         // replicate them.
         //
         List<Block> replicateBlocks = new ArrayList<Block>();
+        List<Integer> numCurrentReplicas = new ArrayList<Integer>();
         List<DatanodeDescriptor[]> replicateTargetSets;
         replicateTargetSets = new ArrayList<DatanodeDescriptor[]>();
         for (Iterator<Block> it = neededReplications.iterator(); it.hasNext();) {
@@ -1943,17 +2159,23 @@
             Collection<DatanodeDescriptor> containingNodes = blocksMap.get(block);
             Collection<Block> excessBlocks = excessReplicateMap.get( 
                                                       srcNode.getStorageID() );
+
             // srcNode must contain the block, and the block must
             // not be scheduled for removal on that node
             if (containingNodes != null && containingNodes.contains(srcNode)
                 && (excessBlocks == null || ! excessBlocks.contains(block))) {
+
+              // filter out containingNodes that are marked for decommission.
+              int numCurrentReplica = countContainingNodes(containingNodes);
+
               DatanodeDescriptor targets[] = chooseTargets(
-                  Math.min( fileINode.getReplication() - containingNodes.size(),
+                  Math.min( fileINode.getReplication() - numCurrentReplica,
                             this.maxReplicationStreams - xmitsInProgress), 
                   containingNodes, null, blockSize);
               if (targets.length > 0) {
                 // Build items to return
                 replicateBlocks.add(block);
+                numCurrentReplicas.add(new Integer(numCurrentReplica));
                 replicateTargetSets.add(targets);
                 scheduledXfers += targets.length;
               }
@@ -1973,9 +2195,10 @@
             Block block = it.next();
             DatanodeDescriptor targets[] = 
                       (DatanodeDescriptor[]) replicateTargetSets.get(i);
+            int numCurrentReplica = numCurrentReplicas.get(i).intValue();
             Collection<DatanodeDescriptor> containingNodes = blocksMap.get(block);
 
-            if (containingNodes.size() + targets.length >= 
+            if (numCurrentReplica + targets.length >= 
                     dir.getFileByBlock( block).getReplication() ) {
               neededReplications.remove(block);
               pendingReplications.add(block);
@@ -2060,7 +2283,8 @@
                  it.hasNext();) {
               DatanodeDescriptor node = it.next();
               if ((forbiddenNodes == null || !forbiddenNodes.contains(node)) &&
-                  clientMachine.toString().equals(node.getHost())) {
+                  clientMachine.toString().equals(node.getHost()) &&
+                  !node.isDecommissionInProgress() && !node.isDecommissioned()) {
                 if ((node.getRemaining() >= blockSize * MIN_BLOCKS_FOR_WRITE) &&
                     (node.getXceiverCount() <= (2.0 * avgLoad))) {
                   targets.add(node);
@@ -2084,6 +2308,7 @@
             DatanodeDescriptor node = heartbeats.get(idx);
             if ((forbiddenNodes == null || !forbiddenNodes.contains(node)) &&
                 !targets.contains(node) &&
+                !node.isDecommissionInProgress() && !node.isDecommissioned() &&
                 (node.getRemaining() >= blockSize * MIN_BLOCKS_FOR_WRITE) &&
                 (node.getXceiverCount() <= (2.0 * avgLoad))) {
               target = node;
@@ -2100,6 +2325,7 @@
               DatanodeDescriptor node = heartbeats.get(idx);
               if ((forbiddenNodes == null || !forbiddenNodes.contains(node)) &&
                   !targets.contains(node) &&
+                  !node.isDecommissionInProgress() && !node.isDecommissioned() &&
                   node.getRemaining() >= blockSize) {
                 target = node;
                 break;

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java?view=diff&rev=492695&r1=492694&r2=492695
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java Thu Jan  4 11:14:39 2007
@@ -465,6 +465,27 @@
     boolean isInSafeMode() {
       return namesystem.isInSafeMode();
     }
+
+    /**
+     * Set administrative commands to decommission datanodes.
+     */
+    public boolean decommission(DecommissionAction action, String[] nodes)
+                                throws IOException {
+      boolean ret = true;
+      switch (action) {
+        case DECOMMISSION_SET: // decommission datanode(s)
+          namesystem.startDecommission(nodes);
+          break;
+        case DECOMMISSION_CLEAR: // remove decommission state of a datanode
+          namesystem.stopDecommission(nodes);
+          break;
+        case DECOMMISSION_GET: // are all the node decommissioned?
+          ret = namesystem.checkDecommissioned(nodes);
+          break;
+        }
+        return ret;
+    }
+
     
     ////////////////////////////////////////////////////////////////
     // DatanodeProtocol
@@ -513,6 +534,14 @@
         if (blocks != null) {
             return new BlockCommand(blocks);
         }
+        //
+        // See if the decommissioned node has finished moving all
+        // its datablocks to another replica. This is a loose
+        // heuristic to determine when a decommission is really over.
+        // We can probbaly do it in a seperate thread rather than making
+        // the heartbeat thread do this.
+        //
+        namesystem.checkDecommissionState(nodeReg);
         return null;
     }
 

Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDecommission.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDecommission.java?view=auto&rev=492695
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDecommission.java (added)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDecommission.java Thu Jan  4 11:14:39 2007
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs;
+
+import junit.framework.TestCase;
+import java.io.*;
+import java.util.Random;
+import java.net.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.fs.FSOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * This class tests the decommissioning of nodes.
+ * @author Dhruba Borthakur
+ */
+public class TestDecommission extends TestCase {
+  static final long seed = 0xDEADBEEFL;
+  static final int blockSize = 8192;
+  static final int fileSize = 16384;
+  static final int numDatanodes = 4;
+
+  Random myrand = new Random();
+
+  private void writeFile(FileSystem fileSys, Path name, int repl)
+  throws IOException {
+    // create and write a file that contains three blocks of data
+    FSOutputStream stm = fileSys.createRaw(name, true, (short)repl,
+        (long)blockSize);
+    byte[] buffer = new byte[fileSize];
+    Random rand = new Random(seed);
+    rand.nextBytes(buffer);
+    stm.write(buffer);
+    stm.close();
+  }
+  
+  
+  private void checkFile(FileSystem fileSys, Path name, int repl)
+  throws IOException {
+    String[][] locations = fileSys.getFileCacheHints(name, 0, fileSize);
+    for (int idx = 0; idx < locations.length; idx++) {
+      assertEquals("Number of replicas for block" + idx,
+          Math.min(numDatanodes, repl), locations[idx].length);  
+    }
+  }
+
+  /**
+   * For blocks that reside on the nodes that are down, verify that their
+   * replication factor is 1 more than the specified one.
+   */
+  private void checkFile(FileSystem fileSys, Path name, int repl,
+                         String[] downnodes) throws IOException {
+    FSInputStream is = fileSys.openRaw(name);
+    DFSClient.DFSInputStream dis = (DFSClient.DFSInputStream) is;
+    DatanodeInfo[][] dinfo = dis.getDataNodes();
+
+    for (int blk = 0; blk < dinfo.length; blk++) { // for each block
+      int hasdown = 0;
+      DatanodeInfo[] nodes = dinfo[blk];
+      for (int j = 0; j < nodes.length; j++) {     // for each replica
+        for (int k = 0; downnodes != null && k < downnodes.length; k++) {
+          if (nodes[j].getName().equals(downnodes[k])) {
+            hasdown++;
+            System.out.println("Block " + blk + " replica " +
+                               nodes[j].getName() + " is decommissioned.");
+          }
+        }
+      }
+      System.out.println("Block " + blk + " has " + hasdown +
+                           " decommissioned replica.");
+      assertEquals("Number of replicas for block" + blk,
+            Math.min(numDatanodes, repl+hasdown), nodes.length);  
+    }
+  }
+  
+  private void cleanupFile(FileSystem fileSys, Path name) throws IOException {
+    assertTrue(fileSys.exists(name));
+    fileSys.delete(name);
+    assertTrue(!fileSys.exists(name));
+  }
+
+  private void printDatanodeReport(DatanodeInfo[] info) {
+    System.out.println("-------------------------------------------------");
+    for (int i = 0; i < info.length; i++) {
+      System.out.println(info[i].getDatanodeReport());
+      System.out.println();
+    }
+  }
+
+  /*
+   * decommission one random node.
+   */
+  private String[] decommissionNode(DFSClient client, FileSystem filesys)
+                                    throws IOException {
+    DistributedFileSystem dfs = (DistributedFileSystem) filesys;
+    DatanodeInfo[] info = client.datanodeReport();
+
+    //
+    // pick one datanode randomly.
+    //
+    int index = myrand.nextInt(info.length);
+    String nodename = info[index].getName();
+    System.out.println("Decommissioning node: " + nodename);
+    String[] nodes = new String[1];
+    nodes[0] = nodename;
+    dfs.decommission(FSConstants.DecommissionAction.DECOMMISSION_SET, nodes);
+    return nodes;
+  }
+
+  /*
+   * put node back in action
+   */
+  private void commissionNode(DFSClient client, FileSystem filesys,
+                              String[] nodes) throws IOException {
+    DistributedFileSystem dfs = (DistributedFileSystem) filesys;
+    DatanodeInfo[] info = client.datanodeReport();
+
+    for (int i = 0; i < nodes.length; i++) {
+      System.out.println("Putting node back in action: " + nodes[i]);
+    }
+    dfs.decommission(FSConstants.DecommissionAction.DECOMMISSION_CLEAR, nodes);
+  }
+
+  /* 
+   * Check that node(s) were decommissioned
+   */
+  private void checkNodeDecommission(DFSClient client, FileSystem filesys,
+                                     String[] nodes) throws IOException {
+    DistributedFileSystem dfs = (DistributedFileSystem) filesys;
+    boolean ret = dfs.decommission(
+                    FSConstants.DecommissionAction.DECOMMISSION_GET, nodes);
+    assertEquals("State of Decommissioned Datanode(s) ", ret, true);
+  }
+
+  /* 
+   * Wait till node is fully decommissioned.
+   */
+  private void waitNodeDecommission(DFSClient client, FileSystem filesys,
+                                     String[] nodes) throws IOException {
+    DistributedFileSystem dfs = (DistributedFileSystem) filesys;
+    boolean done = dfs.decommission(
+                     FSConstants.DecommissionAction.DECOMMISSION_GET, nodes);
+    while (!done) {
+      System.out.println("Waiting for nodes " + nodes[0] +
+                         " to be fully decommissioned...");
+      try {
+        Thread.sleep(5000L);
+      } catch (InterruptedException e) {
+        // nothing
+      }
+      done = dfs.decommission(FSConstants.DecommissionAction.DECOMMISSION_GET,
+                              nodes);
+    }
+    //
+    // sleep an additional 10 seconds for the blockreports from the datanodes
+    // to arrive. 
+    //
+    try {
+      Thread.sleep(10 * 1000L);
+    } catch (Exception e) {
+    }
+  }
+  
+  /**
+   * Tests Decommission in DFS.
+   */
+  public void testDecommission() throws IOException {
+    Configuration conf = new Configuration();
+    MiniDFSCluster cluster = new MiniDFSCluster(65312, conf, numDatanodes, false);
+    // Now wait for 15 seconds to give datanodes chance to register
+    // themselves and to report heartbeat
+    try {
+      Thread.sleep(15000L);
+    } catch (InterruptedException e) {
+      // nothing
+    }
+    InetSocketAddress addr = new InetSocketAddress("localhost", 
+                                             cluster.getNameNodePort());
+    DFSClient client = new DFSClient(addr, conf);
+    DatanodeInfo[] info = client.datanodeReport();
+    assertEquals("Number of Datanodes ", numDatanodes, info.length);
+    FileSystem fileSys = cluster.getFileSystem();
+    DistributedFileSystem dfs = (DistributedFileSystem) fileSys;
+
+    try {
+      for (int iteration = 0; iteration < 2; iteration++) {
+        //
+        // Decommission one node. Verify that node is decommissioned.
+        // Verify that replication factor of file has increased from 3
+        // to 4. This means one replica is on decommissioned node.
+        // 
+        Path file1 = new Path("smallblocktest.dat");
+        writeFile(fileSys, file1, 3);
+        checkFile(fileSys, file1, 3);
+        String downnodes[] = decommissionNode(client, fileSys);
+        waitNodeDecommission(client, fileSys, downnodes);
+        checkFile(fileSys, file1, 3, downnodes);
+        commissionNode(client, fileSys, downnodes);
+        cleanupFile(fileSys, file1);
+      }
+    } catch (IOException e) {
+      info = client.datanodeReport();
+      printDatanodeReport(info);
+      throw e;
+    } finally {
+      fileSys.close();
+      cluster.shutdown();
+    }
+  }
+}

Modified: lucene/hadoop/trunk/src/webapps/dfs/dfshealth.jsp
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/webapps/dfs/dfshealth.jsp?view=diff&rev=492695&r1=492694&r2=492695
==============================================================================
--- lucene/hadoop/trunk/src/webapps/dfs/dfshealth.jsp (original)
+++ lucene/hadoop/trunk/src/webapps/dfs/dfshealth.jsp Thu Jan  4 11:14:39 2007
@@ -82,9 +82,15 @@
       percentUsed = FsShell.limitDecimal(((1.0 * u)/c)*100, 2);
     else
       percentUsed = "100";
+
+    String adminState = (d.isDecommissioned() ? "Decommissioned" :
+                         (d.isDecommissionInProgress() ? "Decommission In Progress":
+                          "In Service"));
     
     out.print("<td class=\"lastcontact\"> " +
               ((currentTime - d.getLastUpdate())/1000) +
+	      "<td class=\"adminstate\">" +
+              adminState +
 	      "<td class=\"size\">" +
               FsShell.limitDecimal(c*1.0/diskBytes, 2) +
 	      "<td class=\"pcused\">" + percentUsed +
@@ -167,6 +173,7 @@
 	    out.print( "<tr class=\"headerRow\"> <th " +
                        NodeHeaderStr("name") + "> Node <th " +
                        NodeHeaderStr("lastcontact") + "> Last Contact <th " +
+                       NodeHeaderStr("adminstate") + "> Admin State <th " +
                        NodeHeaderStr("size") + "> Size (" + diskByteStr +
                        ") <th " + NodeHeaderStr("pcused") +
                        "> Used (%) <th " + NodeHeaderStr("blocks") +

Modified: lucene/hadoop/trunk/src/webapps/static/hadoop.css
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/webapps/static/hadoop.css?view=diff&rev=492695&r1=492694&r2=492695
==============================================================================
--- lucene/hadoop/trunk/src/webapps/static/hadoop.css (original)
+++ lucene/hadoop/trunk/src/webapps/static/hadoop.css Thu Jan  4 11:14:39 2007
@@ -41,7 +41,7 @@
 	cursor : pointer;
 }
 
-div#dfsnodetable td.blocks, td.size, td.pcused, td.lastcontact {
+div#dfsnodetable td.blocks, td.size, td.pcused, td.adminstate, td.lastcontact {
 	text-align : right;
 }