You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ar...@apache.org on 2014/07/31 08:02:47 UTC

svn commit: r1614812 - in /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/server/balancer/ src/test/java/org/apache/hadoop/hdfs/server/balancer/

Author: arp
Date: Thu Jul 31 06:02:46 2014
New Revision: 1614812

URL: http://svn.apache.org/r1614812
Log:
HDFS-6441. Add ability to exclude/include specific datanodes while balancing. (Contributed by Benoy Antony and Yu Li)

Modified:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1614812&r1=1614811&r2=1614812&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Thu Jul 31 06:02:46 2014
@@ -335,6 +335,9 @@ Release 2.6.0 - UNRELEASED
     HDFS-6570. add api that enables checking if a user has certain permissions on
     a file. (Jitendra Pandey via cnauroth)
 
+    HDFS-6441. Add ability to exclude/include specific datanodes while
+    balancing. (Benoy Antony and Yu Li via Arpit Agarwal)
+
   OPTIMIZATIONS
 
     HDFS-6690. Deduplicate xattr names in memory. (wang)

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=1614812&r1=1614811&r2=1614812&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java Thu Jul 31 06:02:46 2014
@@ -45,6 +45,7 @@ import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -83,6 +84,7 @@ import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.HostsFileReader;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.util.Tool;
@@ -203,12 +205,20 @@ public class Balancer {
       + "\n\t[-policy <policy>]\tthe balancing policy: "
       + BalancingPolicy.Node.INSTANCE.getName() + " or "
       + BalancingPolicy.Pool.INSTANCE.getName()
-      + "\n\t[-threshold <threshold>]\tPercentage of disk capacity";
+      + "\n\t[-threshold <threshold>]\tPercentage of disk capacity"
+      + "\n\t[-exclude [-f <hosts-file> | comma-sperated list of hosts]]"
+      + "\tExcludes the specified datanodes."
+      + "\n\t[-include [-f <hosts-file> | comma-sperated list of hosts]]"
+      + "\tIncludes only the specified datanodes.";
   
   private final NameNodeConnector nnc;
   private final BalancingPolicy policy;
   private final SaslDataTransferClient saslClient;
   private final double threshold;
+  // set of data nodes to be excluded from balancing operations.
+  Set<String> nodesToBeExcluded;
+  //Restrict balancing to the following nodes.
+  Set<String> nodesToBeIncluded;
   
   // all data node lists
   private final Collection<Source> overUtilizedDatanodes
@@ -869,6 +879,8 @@ public class Balancer {
   Balancer(NameNodeConnector theblockpool, Parameters p, Configuration conf) {
     this.threshold = p.threshold;
     this.policy = p.policy;
+    this.nodesToBeExcluded = p.nodesToBeExcluded;
+    this.nodesToBeIncluded = p.nodesToBeIncluded;
     this.nnc = theblockpool;
     cluster = NetworkTopology.getInstance(conf);
 
@@ -905,8 +917,13 @@ public class Balancer {
   private long initNodes(DatanodeInfo[] datanodes) {
     // compute average utilization
     for (DatanodeInfo datanode : datanodes) {
-      if (datanode.isDecommissioned() || datanode.isDecommissionInProgress()) {
-        continue; // ignore decommissioning or decommissioned nodes
+     // ignore decommissioning or decommissioned nodes or
+      // ignore nodes in exclude list
+      // or nodes not in the include list (if include list is not empty)
+      if (datanode.isDecommissioned() || datanode.isDecommissionInProgress() ||
+          Util.shouldBeExcluded(nodesToBeExcluded, datanode) ||
+          !Util.shouldBeIncluded(nodesToBeIncluded, datanode)) {
+        continue;
       }
       policy.accumulateSpaces(datanode);
     }
@@ -919,8 +936,16 @@ public class Balancer {
      */  
     long overLoadedBytes = 0L, underLoadedBytes = 0L;
     for (DatanodeInfo datanode : DFSUtil.shuffle(datanodes)) {
-      if (datanode.isDecommissioned() || datanode.isDecommissionInProgress()) {
-        continue; // ignore decommissioning or decommissioned nodes
+      // ignore decommissioning or decommissioned nodes or
+      // ignore nodes in exclude list
+      // or nodes not in the include list (if include list is not empty)
+      if (datanode.isDecommissioned() || datanode.isDecommissionInProgress() ||
+          Util.shouldBeExcluded(nodesToBeExcluded, datanode) ||
+          !Util.shouldBeIncluded(nodesToBeIncluded, datanode)) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("Excluding datanode " + datanode);
+        }
+        continue;
       }
       cluster.add(datanode);
       BalancerDatanode datanodeS;
@@ -1526,21 +1551,101 @@ public class Balancer {
   }
 
   static class Parameters {
-    static final Parameters DEFALUT = new Parameters(
-        BalancingPolicy.Node.INSTANCE, 10.0);
+    static final Parameters DEFAULT = new Parameters(
+        BalancingPolicy.Node.INSTANCE, 10.0,
+        Collections.<String> emptySet(), Collections.<String> emptySet());
 
     final BalancingPolicy policy;
     final double threshold;
+    // exclude the nodes in this set from balancing operations
+    Set<String> nodesToBeExcluded;
+    //include only these nodes in balancing operations
+    Set<String> nodesToBeIncluded;
 
-    Parameters(BalancingPolicy policy, double threshold) {
+    Parameters(BalancingPolicy policy, double threshold,
+        Set<String> nodesToBeExcluded, Set<String> nodesToBeIncluded) {
       this.policy = policy;
       this.threshold = threshold;
+      this.nodesToBeExcluded = nodesToBeExcluded;
+      this.nodesToBeIncluded = nodesToBeIncluded;
     }
 
     @Override
     public String toString() {
       return Balancer.class.getSimpleName() + "." + getClass().getSimpleName()
-          + "[" + policy + ", threshold=" + threshold + "]";
+          + "[" + policy + ", threshold=" + threshold +
+          ", number of nodes to be excluded = "+ nodesToBeExcluded.size() +
+          ", number of nodes to be included = "+ nodesToBeIncluded.size() +"]";
+    }
+  }
+
+  static class Util {
+
+    /**
+     * @param datanode
+     * @return returns true if data node is part of the excludedNodes.
+     */
+    static boolean shouldBeExcluded(Set<String> excludedNodes, DatanodeInfo datanode) {
+      return isIn(excludedNodes, datanode);
+    }
+
+    /**
+     * @param datanode
+     * @return returns true if includedNodes is empty or data node is part of the includedNodes.
+     */
+    static boolean shouldBeIncluded(Set<String> includedNodes, DatanodeInfo datanode) {
+      return (includedNodes.isEmpty() ||
+          isIn(includedNodes, datanode));
+    }
+    /**
+     * Match is checked using host name , ip address with and without port number.
+     * @param datanodeSet
+     * @param datanode
+     * @return true if the datanode's transfer address matches the set of nodes.
+     */
+    private static boolean isIn(Set<String> datanodeSet, DatanodeInfo datanode) {
+      return isIn(datanodeSet, datanode.getPeerHostName(), datanode.getXferPort()) ||
+          isIn(datanodeSet, datanode.getIpAddr(), datanode.getXferPort()) ||
+          isIn(datanodeSet, datanode.getHostName(), datanode.getXferPort());
+    }
+
+    /**
+     * returns true if nodes contains host or host:port
+     * @param nodes
+     * @param host
+     * @param port
+     * @return
+     */
+    private static boolean isIn(Set<String> nodes, String host, int port) {
+      if (host == null) {
+        return false;
+      }
+      return (nodes.contains(host) || nodes.contains(host +":"+ port));
+    }
+
+    /**
+     * parse a comma separated string to obtain set of host names
+     * @param string
+     * @return
+     */
+    static Set<String> parseHostList(String string) {
+      String[] addrs = StringUtils.getTrimmedStrings(string);
+      return new HashSet<String>(Arrays.asList(addrs));
+    }
+
+    /**
+     * read set of host names from a file
+     * @param fileName
+     * @return
+     */
+    static Set<String> getHostListFromFile(String fileName) {
+      Set<String> nodes = new HashSet <String> ();
+      try {
+        HostsFileReader.readFileToSet("nodes", fileName, nodes);
+        return StringUtils.getTrimmedStrings(nodes);
+      } catch (IOException e) {
+        throw new IllegalArgumentException("Unable to open file: " + fileName);
+      }
     }
   }
 
@@ -1578,8 +1683,10 @@ public class Balancer {
 
     /** parse command line arguments */
     static Parameters parse(String[] args) {
-      BalancingPolicy policy = Parameters.DEFALUT.policy;
-      double threshold = Parameters.DEFALUT.threshold;
+      BalancingPolicy policy = Parameters.DEFAULT.policy;
+      double threshold = Parameters.DEFAULT.threshold;
+      Set<String> nodesTobeExcluded = Parameters.DEFAULT.nodesToBeExcluded;
+      Set<String> nodesTobeIncluded = Parameters.DEFAULT.nodesToBeIncluded;
 
       if (args != null) {
         try {
@@ -1608,18 +1715,38 @@ public class Balancer {
                 System.err.println("Illegal policy name: " + args[i]);
                 throw e;
               }
+            } else if ("-exclude".equalsIgnoreCase(args[i])) {
+              i++;
+              if ("-f".equalsIgnoreCase(args[i])) {
+                nodesTobeExcluded = Util.getHostListFromFile(args[++i]);
+              } else {
+                nodesTobeExcluded = Util.parseHostList(args[i]);
+              }
+            } else if ("-include".equalsIgnoreCase(args[i])) {
+              i++;
+              if ("-f".equalsIgnoreCase(args[i])) {
+                nodesTobeIncluded = Util.getHostListFromFile(args[++i]);
+               } else {
+                nodesTobeIncluded = Util.parseHostList(args[i]);
+              }
             } else {
               throw new IllegalArgumentException("args = "
                   + Arrays.toString(args));
             }
           }
+          if (!nodesTobeExcluded.isEmpty() && !nodesTobeIncluded.isEmpty()) {
+            System.err.println(
+                "-exclude and -include options cannot be specified together.");
+            throw new IllegalArgumentException(
+                "-exclude and -include options cannot be specified together.");
+          }
         } catch(RuntimeException e) {
           printUsage(System.err);
           throw e;
         }
       }
       
-      return new Parameters(policy, threshold);
+      return new Parameters(policy, threshold, nodesTobeExcluded, nodesTobeIncluded);
     }
 
     private static void printUsage(PrintStream out) {

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java?rev=1614812&r1=1614811&r2=1614812&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java Thu Jul 31 06:02:46 2014
@@ -18,17 +18,23 @@
 package org.apache.hadoop.hdfs.server.balancer;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import java.io.File;
 import java.io.IOException;
+import java.io.PrintWriter;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Random;
+import java.util.Set;
 import java.util.concurrent.TimeoutException;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.impl.Log4JLogger;
@@ -48,6 +54,8 @@ import org.apache.hadoop.hdfs.protocol.E
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.server.balancer.Balancer.Cli;
+import org.apache.hadoop.hdfs.server.balancer.Balancer.Parameters;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.util.Tool;
@@ -255,6 +263,18 @@ public class TestBalancer {
       }
     }
   }
+
+  /**
+   * Wait until balanced: each datanode gives utilization within
+   * BALANCE_ALLOWED_VARIANCE of average
+   * @throws IOException
+   * @throws TimeoutException
+   */
+  static void waitForBalancer(long totalUsedSpace, long totalCapacity,
+      ClientProtocol client, MiniDFSCluster cluster, Balancer.Parameters p)
+  throws IOException, TimeoutException {
+    waitForBalancer(totalUsedSpace, totalCapacity, client, cluster, p, 0);
+  }
   
   /**
    * Wait until balanced: each datanode gives utilization within 
@@ -263,11 +283,17 @@ public class TestBalancer {
    * @throws TimeoutException
    */
   static void waitForBalancer(long totalUsedSpace, long totalCapacity,
-      ClientProtocol client, MiniDFSCluster cluster)
-  throws IOException, TimeoutException {
+      ClientProtocol client, MiniDFSCluster cluster, Balancer.Parameters p,
+      int expectedExcludedNodes) throws IOException, TimeoutException {
     long timeout = TIMEOUT;
     long failtime = (timeout <= 0L) ? Long.MAX_VALUE
         : Time.now() + timeout;
+    if (!p.nodesToBeIncluded.isEmpty()) {
+      totalCapacity = p.nodesToBeIncluded.size() * CAPACITY;
+    }
+    if (!p.nodesToBeExcluded.isEmpty()) {
+        totalCapacity -= p.nodesToBeExcluded.size() * CAPACITY;
+    }
     final double avgUtilization = ((double)totalUsedSpace) / totalCapacity;
     boolean balanced;
     do {
@@ -275,9 +301,20 @@ public class TestBalancer {
           client.getDatanodeReport(DatanodeReportType.ALL);
       assertEquals(datanodeReport.length, cluster.getDataNodes().size());
       balanced = true;
+      int actualExcludedNodeCount = 0;
       for (DatanodeInfo datanode : datanodeReport) {
         double nodeUtilization = ((double)datanode.getDfsUsed())
             / datanode.getCapacity();
+        if (Balancer.Util.shouldBeExcluded(p.nodesToBeExcluded, datanode)) {
+          assertTrue(nodeUtilization == 0);
+          actualExcludedNodeCount++;
+          continue;
+        }
+        if (!Balancer.Util.shouldBeIncluded(p.nodesToBeIncluded, datanode)) {
+          assertTrue(nodeUtilization == 0);
+          actualExcludedNodeCount++;
+          continue;
+        }
         if (Math.abs(avgUtilization - nodeUtilization) > BALANCE_ALLOWED_VARIANCE) {
           balanced = false;
           if (Time.now() > failtime) {
@@ -294,6 +331,7 @@ public class TestBalancer {
           break;
         }
       }
+      assertEquals(expectedExcludedNodes,actualExcludedNodeCount);
     } while (!balanced);
   }
 
@@ -307,22 +345,118 @@ public class TestBalancer {
     }
     return b.append("]").toString();
   }
-  /** This test start a cluster with specified number of nodes, 
+  /**
+   * Class which contains information about the
+   * new nodes to be added to the cluster for balancing.
+   */
+  static abstract class NewNodeInfo {
+
+    Set<String> nodesToBeExcluded = new HashSet<String>();
+    Set<String> nodesToBeIncluded = new HashSet<String>();
+
+     abstract String[] getNames();
+     abstract int getNumberofNewNodes();
+     abstract int getNumberofIncludeNodes();
+     abstract int getNumberofExcludeNodes();
+
+     public Set<String> getNodesToBeIncluded() {
+       return nodesToBeIncluded;
+     }
+     public Set<String> getNodesToBeExcluded() {
+       return nodesToBeExcluded;
+     }
+  }
+
+  /**
+   * The host names of new nodes are specified
+   */
+  static class HostNameBasedNodes extends NewNodeInfo {
+    String[] hostnames;
+
+    public HostNameBasedNodes(String[] hostnames,
+        Set<String> nodesToBeExcluded, Set<String> nodesToBeIncluded) {
+      this.hostnames = hostnames;
+      this.nodesToBeExcluded = nodesToBeExcluded;
+      this.nodesToBeIncluded = nodesToBeIncluded;
+    }
+
+    @Override
+    String[] getNames() {
+      return hostnames;
+    }
+    @Override
+    int getNumberofNewNodes() {
+      return hostnames.length;
+    }
+    @Override
+    int getNumberofIncludeNodes() {
+      return nodesToBeIncluded.size();
+    }
+    @Override
+    int getNumberofExcludeNodes() {
+      return nodesToBeExcluded.size();
+    }
+  }
+
+  /**
+   * The number of data nodes to be started are specified.
+   * The data nodes will have same host name, but different port numbers.
+   *
+   */
+  static class PortNumberBasedNodes extends NewNodeInfo {
+    int newNodes;
+    int excludeNodes;
+    int includeNodes;
+
+    public PortNumberBasedNodes(int newNodes, int excludeNodes, int includeNodes) {
+      this.newNodes = newNodes;
+      this.excludeNodes = excludeNodes;
+      this.includeNodes = includeNodes;
+    }
+
+    @Override
+    String[] getNames() {
+      return null;
+    }
+    @Override
+    int getNumberofNewNodes() {
+      return newNodes;
+    }
+    @Override
+    int getNumberofIncludeNodes() {
+      return includeNodes;
+    }
+    @Override
+    int getNumberofExcludeNodes() {
+      return excludeNodes;
+    }
+  }
+
+  private void doTest(Configuration conf, long[] capacities, String[] racks,
+      long newCapacity, String newRack, boolean useTool) throws Exception {
+    doTest(conf, capacities, racks, newCapacity, newRack, null, useTool, false);
+  }
+
+  /** This test start a cluster with specified number of nodes,
    * and fills it to be 30% full (with a single file replicated identically
    * to all datanodes);
    * It then adds one new empty node and starts balancing.
-   * 
+   *
    * @param conf - configuration
    * @param capacities - array of capacities of original nodes in cluster
    * @param racks - array of racks for original nodes in cluster
    * @param newCapacity - new node's capacity
    * @param newRack - new node's rack
+   * @param nodes - information about new nodes to be started.
    * @param useTool - if true run test via Cli with command-line argument 
    *   parsing, etc.   Otherwise invoke balancer API directly.
+   * @param useFile - if true, the hosts to included or excluded will be stored in a
+   *   file and then later read from the file.
    * @throws Exception
    */
-  private void doTest(Configuration conf, long[] capacities, String[] racks, 
-      long newCapacity, String newRack, boolean useTool) throws Exception {
+  private void doTest(Configuration conf, long[] capacities,
+      String[] racks, long newCapacity, String newRack, NewNodeInfo nodes,
+      boolean useTool, boolean useFile) throws Exception {
     LOG.info("capacities = " +  long2String(capacities)); 
     LOG.info("racks      = " +  Arrays.asList(racks)); 
     LOG.info("newCapacity= " +  newCapacity); 
@@ -346,17 +480,75 @@ public class TestBalancer {
       long totalUsedSpace = totalCapacity*3/10;
       createFile(cluster, filePath, totalUsedSpace / numOfDatanodes,
           (short) numOfDatanodes, 0);
-      // start up an empty node with the same capacity and on the same rack
-      cluster.startDataNodes(conf, 1, true, null,
-          new String[]{newRack}, new long[]{newCapacity});
 
-      totalCapacity += newCapacity;
+      if (nodes == null) { // there is no specification of new nodes.
+        // start up an empty node with the same capacity and on the same rack
+        cluster.startDataNodes(conf, 1, true, null,
+            new String[]{newRack}, null,new long[]{newCapacity});
+        totalCapacity += newCapacity;
+      } else {
+        //if running a test with "include list", include original nodes as well
+        if (nodes.getNumberofIncludeNodes()>0) {
+          for (DataNode dn: cluster.getDataNodes())
+            nodes.getNodesToBeIncluded().add(dn.getDatanodeId().getHostName());
+        }
+        String[] newRacks = new String[nodes.getNumberofNewNodes()];
+        long[] newCapacities = new long[nodes.getNumberofNewNodes()];
+        for (int i=0; i < nodes.getNumberofNewNodes(); i++) {
+          newRacks[i] = newRack;
+          newCapacities[i] = newCapacity;
+        }
+        // if host names are specified for the new nodes to be created.
+        if (nodes.getNames() != null) {
+          cluster.startDataNodes(conf, nodes.getNumberofNewNodes(), true, null,
+              newRacks, nodes.getNames(), newCapacities);
+          totalCapacity += newCapacity*nodes.getNumberofNewNodes();
+        } else {  // host names are not specified
+          cluster.startDataNodes(conf, nodes.getNumberofNewNodes(), true, null,
+              newRacks, null, newCapacities);
+          totalCapacity += newCapacity*nodes.getNumberofNewNodes();
+          //populate the include nodes
+          if (nodes.getNumberofIncludeNodes() > 0) {
+            int totalNodes = cluster.getDataNodes().size();
+            for (int i=0; i < nodes.getNumberofIncludeNodes(); i++) {
+              nodes.getNodesToBeIncluded().add (cluster.getDataNodes().get(
+                  totalNodes-1-i).getDatanodeId().getXferAddr());
+            }
+          }
+          //polulate the exclude nodes
+          if (nodes.getNumberofExcludeNodes() > 0) {
+            int totalNodes = cluster.getDataNodes().size();
+            for (int i=0; i < nodes.getNumberofExcludeNodes(); i++) {
+              nodes.getNodesToBeExcluded().add (cluster.getDataNodes().get(
+                  totalNodes-1-i).getDatanodeId().getXferAddr());
+            }
+          }
+        }
+      }
+      // run balancer and validate results
+      Balancer.Parameters p = Balancer.Parameters.DEFAULT;
+      if (nodes != null) {
+        p = new Balancer.Parameters(
+            Balancer.Parameters.DEFAULT.policy,
+            Balancer.Parameters.DEFAULT.threshold,
+            nodes.getNodesToBeExcluded(), nodes.getNodesToBeIncluded());
+      }
+
+      int expectedExcludedNodes = 0;
+      if (nodes != null) {
+        if (!nodes.getNodesToBeExcluded().isEmpty()) {
+          expectedExcludedNodes = nodes.getNodesToBeExcluded().size();
+        } else if (!nodes.getNodesToBeIncluded().isEmpty()) {
+          expectedExcludedNodes =
+              cluster.getDataNodes().size() - nodes.getNodesToBeIncluded().size();
+        }
+      }
 
       // run balancer and validate results
       if (useTool) {
-        runBalancerCli(conf, totalUsedSpace, totalCapacity);
+        runBalancerCli(conf, totalUsedSpace, totalCapacity, p, useFile, expectedExcludedNodes);
       } else {
-        runBalancer(conf, totalUsedSpace, totalCapacity);
+        runBalancer(conf, totalUsedSpace, totalCapacity, p, expectedExcludedNodes);
       }
     } finally {
       cluster.shutdown();
@@ -365,11 +557,17 @@ public class TestBalancer {
 
   private void runBalancer(Configuration conf,
       long totalUsedSpace, long totalCapacity) throws Exception {
+    runBalancer(conf, totalUsedSpace, totalCapacity, Balancer.Parameters.DEFAULT, 0);
+  }
+
+  private void runBalancer(Configuration conf,
+     long totalUsedSpace, long totalCapacity, Balancer.Parameters p,
+     int excludedNodes) throws Exception {
     waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster);
 
     // start rebalancing
     Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
-    final int r = Balancer.run(namenodes, Balancer.Parameters.DEFALUT, conf);
+    final int r = Balancer.run(namenodes, p, conf);
     if (conf.getInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, 
         DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT) ==0) {
       assertEquals(Balancer.ReturnStatus.NO_MOVE_PROGRESS.code, r);
@@ -379,22 +577,66 @@ public class TestBalancer {
     }
     waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster);
     LOG.info("Rebalancing with default ctor.");
-    waitForBalancer(totalUsedSpace, totalCapacity, client, cluster);
+    waitForBalancer(totalUsedSpace, totalCapacity, client, cluster, p, excludedNodes);
   }
-  
+
   private void runBalancerCli(Configuration conf,
-      long totalUsedSpace, long totalCapacity) throws Exception {
+      long totalUsedSpace, long totalCapacity,
+      Balancer.Parameters p, boolean useFile, int expectedExcludedNodes) throws Exception {
     waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster);
+    List <String> args = new ArrayList<String>();
+    args.add("-policy");
+    args.add("datanode");
+
+    File excludeHostsFile = null;
+    if (!p.nodesToBeExcluded.isEmpty()) {
+      args.add("-exclude");
+      if (useFile) {
+        excludeHostsFile = new File ("exclude-hosts-file");
+        PrintWriter pw = new PrintWriter(excludeHostsFile);
+        for (String host: p.nodesToBeExcluded) {
+          pw.write( host + "\n");
+        }
+        pw.close();
+        args.add("-f");
+        args.add("exclude-hosts-file");
+      } else {
+        args.add(StringUtils.join(p.nodesToBeExcluded, ','));
+      }
+    }
+
+    File includeHostsFile = null;
+    if (!p.nodesToBeIncluded.isEmpty()) {
+      args.add("-include");
+      if (useFile) {
+        includeHostsFile = new File ("include-hosts-file");
+        PrintWriter pw = new PrintWriter(includeHostsFile);
+        for (String host: p.nodesToBeIncluded){
+          pw.write( host + "\n");
+        }
+        pw.close();
+        args.add("-f");
+        args.add("include-hosts-file");
+      } else {
+        args.add(StringUtils.join(p.nodesToBeIncluded, ','));
+      }
+    }
 
-    final String[] args = { "-policy", "datanode" };
     final Tool tool = new Cli();    
     tool.setConf(conf);
-    final int r = tool.run(args); // start rebalancing
+    final int r = tool.run(args.toArray(new String[0])); // start rebalancing
     
     assertEquals("Tools should exit 0 on success", 0, r);
     waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster);
     LOG.info("Rebalancing with default ctor.");
-    waitForBalancer(totalUsedSpace, totalCapacity, client, cluster);
+    waitForBalancer(totalUsedSpace, totalCapacity, client, cluster, p, expectedExcludedNodes);
+
+    if (excludeHostsFile != null && excludeHostsFile.exists()) {
+      excludeHostsFile.delete();
+    }
+    if (includeHostsFile != null && includeHostsFile.exists()) {
+      includeHostsFile.delete();
+    }
   }
   
   /** one-node cluster test*/
@@ -440,7 +682,7 @@ public class TestBalancer {
     }
   }
   
-  /** Test a cluster with even distribution, 
+  /** Test a cluster with even distribution,
    * then a new empty node is added to the cluster*/
   @Test(timeout=100000)
   public void testBalancer0() throws Exception {
@@ -554,7 +796,13 @@ public class TestBalancer {
     } catch (IllegalArgumentException e) {
 
     }
+    parameters = new String[] {"-include",  "testnode1", "-exclude", "testnode2"};
+    try {
+      Balancer.Cli.parse(parameters);
+      fail("IllegalArgumentException is expected when both -exclude and -include are specified");
+    } catch (IllegalArgumentException e) {
 
+    }
   }
 
   /**
@@ -570,6 +818,183 @@ public class TestBalancer {
   }
   
   /**
+   * Test a cluster with even distribution,
+   * then three nodes are added to the cluster,
+   * runs balancer with two of the nodes in the exclude list
+   */
+  @Test(timeout=100000)
+  public void testBalancerWithExcludeList() throws Exception {
+    final Configuration conf = new HdfsConfiguration();
+    initConf(conf);
+    Set<String> excludeHosts = new HashSet<String>();
+    excludeHosts.add( "datanodeY");
+    excludeHosts.add( "datanodeZ");
+    doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2,
+        new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"},
+        excludeHosts, Parameters.DEFAULT.nodesToBeIncluded), false, false);
+  }
+
+  /**
+   * Test a cluster with even distribution,
+   * then three nodes are added to the cluster,
+   * runs balancer with two of the nodes in the exclude list
+   */
+  @Test(timeout=100000)
+  public void testBalancerWithExcludeListWithPorts() throws Exception {
+    final Configuration conf = new HdfsConfiguration();
+    initConf(conf);
+    doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1},
+        CAPACITY, RACK2, new PortNumberBasedNodes(3, 2, 0), false, false);
+  }
+
+  /**
+   * Test a cluster with even distribution,
+   * then three nodes are added to the cluster,
+   * runs balancer with two of the nodes in the exclude list
+   */
+  @Test(timeout=100000)
+  public void testBalancerCliWithExcludeList() throws Exception {
+    final Configuration conf = new HdfsConfiguration();
+    initConf(conf);
+    Set<String> excludeHosts = new HashSet<String>();
+    excludeHosts.add( "datanodeY");
+    excludeHosts.add( "datanodeZ");
+    doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2,
+      new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"}, excludeHosts,
+      Parameters.DEFAULT.nodesToBeIncluded), true, false);
+  }
+
+  /**
+   * Test a cluster with even distribution,
+   * then three nodes are added to the cluster,
+   * runs balancer with two of the nodes in the exclude list
+   */
+  @Test(timeout=100000)
+  public void testBalancerCliWithExcludeListWithPorts() throws Exception {
+    final Configuration conf = new HdfsConfiguration();
+    initConf(conf);
+    doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1},
+        CAPACITY, RACK2, new PortNumberBasedNodes(3, 2, 0), true, false);
+  }
+
+  /**
+   * Test a cluster with even distribution,
+   * then three nodes are added to the cluster,
+   * runs balancer with two of the nodes in the exclude list in a file
+   */
+  @Test(timeout=100000)
+  public void testBalancerCliWithExcludeListInAFile() throws Exception {
+    final Configuration conf = new HdfsConfiguration();
+    initConf(conf);
+    Set<String> excludeHosts = new HashSet<String>();
+    excludeHosts.add( "datanodeY");
+    excludeHosts.add( "datanodeZ");
+    doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2,
+        new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"},
+        excludeHosts, Parameters.DEFAULT.nodesToBeIncluded), true, true);
+  }
+
+  /**
+   * Test a cluster with even distribution,G
+   * then three nodes are added to the cluster,
+   * runs balancer with two of the nodes in the exclude list
+   */
+  @Test(timeout=100000)
+  public void testBalancerCliWithExcludeListWithPortsInAFile() throws Exception {
+    final Configuration conf = new HdfsConfiguration();
+    initConf(conf);
+    doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1},
+        CAPACITY, RACK2, new PortNumberBasedNodes(3, 2, 0), true, true);
+  }
+
+  /**
+   * Test a cluster with even distribution,
+   * then three nodes are added to the cluster,
+   * runs balancer with two of the nodes in the include list
+   */
+  @Test(timeout=100000)
+  public void testBalancerWithIncludeList() throws Exception {
+    final Configuration conf = new HdfsConfiguration();
+    initConf(conf);
+    Set<String> includeHosts = new HashSet<String>();
+    includeHosts.add( "datanodeY");
+    doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2,
+        new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"},
+        Parameters.DEFAULT.nodesToBeExcluded, includeHosts), false, false);
+  }
+
+  /**
+   * Test a cluster with even distribution,
+   * then three nodes are added to the cluster,
+   * runs balancer with two of the nodes in the include list
+   */
+  @Test(timeout=100000)
+  public void testBalancerWithIncludeListWithPorts() throws Exception {
+    final Configuration conf = new HdfsConfiguration();
+    initConf(conf);
+    doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1},
+        CAPACITY, RACK2, new PortNumberBasedNodes(3, 0, 1), false, false);
+  }
+
+  /**
+   * Test a cluster with even distribution,
+   * then three nodes are added to the cluster,
+   * runs balancer with two of the nodes in the include list
+   */
+  @Test(timeout=100000)
+  public void testBalancerCliWithIncludeList() throws Exception {
+    final Configuration conf = new HdfsConfiguration();
+    initConf(conf);
+    Set<String> includeHosts = new HashSet<String>();
+    includeHosts.add( "datanodeY");
+    doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2,
+        new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"},
+        Parameters.DEFAULT.nodesToBeExcluded, includeHosts), true, false);
+  }
+
+  /**
+   * Test a cluster with even distribution,
+   * then three nodes are added to the cluster,
+   * runs balancer with two of the nodes in the include list
+   */
+  @Test(timeout=100000)
+  public void testBalancerCliWithIncludeListWithPorts() throws Exception {
+    final Configuration conf = new HdfsConfiguration();
+    initConf(conf);
+    doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1},
+        CAPACITY, RACK2, new PortNumberBasedNodes(3, 0, 1), true, false);
+  }
+
+  /**
+   * Test a cluster with even distribution,
+   * then three nodes are added to the cluster,
+   * runs balancer with two of the nodes in the include list
+   */
+  @Test(timeout=100000)
+  public void testBalancerCliWithIncludeListInAFile() throws Exception {
+    final Configuration conf = new HdfsConfiguration();
+    initConf(conf);
+    Set<String> includeHosts = new HashSet<String>();
+    includeHosts.add( "datanodeY");
+    doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2,
+        new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"},
+        Parameters.DEFAULT.nodesToBeExcluded, includeHosts), true, true);
+  }
+
+  /**
+   * Test a cluster with even distribution,
+   * then three nodes are added to the cluster,
+   * runs balancer with two of the nodes in the include list
+   */
+  @Test(timeout=100000)
+  public void testBalancerCliWithIncludeListWithPortsInAFile() throws Exception {
+    final Configuration conf = new HdfsConfiguration();
+    initConf(conf);
+    doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1},
+        CAPACITY, RACK2, new PortNumberBasedNodes(3, 0, 1), true, true);
+  }
+
+  /**
    * @param args
    */
   public static void main(String[] args) throws Exception {

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java?rev=1614812&r1=1614811&r2=1614812&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java Thu Jul 31 06:02:46 2014
@@ -97,10 +97,10 @@ public class TestBalancerWithHANameNodes
       Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
       assertEquals(1, namenodes.size());
       assertTrue(namenodes.contains(HATestUtil.getLogicalUri(cluster)));
-      final int r = Balancer.run(namenodes, Balancer.Parameters.DEFALUT, conf);
+      final int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, conf);
       assertEquals(Balancer.ReturnStatus.SUCCESS.code, r);
       TestBalancer.waitForBalancer(totalUsedSpace, totalCapacity, client,
-          cluster);
+          cluster, Balancer.Parameters.DEFAULT);
     } finally {
       cluster.shutdown();
     }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java?rev=1614812&r1=1614811&r2=1614812&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java Thu Jul 31 06:02:46 2014
@@ -159,7 +159,7 @@ public class TestBalancerWithMultipleNam
 
     // start rebalancing
     final Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(s.conf);
-    final int r = Balancer.run(namenodes, Balancer.Parameters.DEFALUT, s.conf);
+    final int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, s.conf);
     Assert.assertEquals(Balancer.ReturnStatus.SUCCESS.code, r);
 
     LOG.info("BALANCER 2");
@@ -195,7 +195,7 @@ public class TestBalancerWithMultipleNam
       balanced = true;
       for(int d = 0; d < used.length; d++) {
         final double p = used[d]*100.0/cap[d];
-        balanced = p <= avg + Balancer.Parameters.DEFALUT.threshold;
+        balanced = p <= avg + Balancer.Parameters.DEFAULT.threshold;
         if (!balanced) {
           if (i % 100 == 0) {
             LOG.warn("datanodes " + d + " is not yet balanced: "

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java?rev=1614812&r1=1614811&r2=1614812&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java Thu Jul 31 06:02:46 2014
@@ -175,7 +175,7 @@ public class TestBalancerWithNodeGroup {
 
     // start rebalancing
     Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
-    final int r = Balancer.run(namenodes, Balancer.Parameters.DEFALUT, conf);
+    final int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, conf);
     assertEquals(Balancer.ReturnStatus.SUCCESS.code, r);
 
     waitForHeartBeat(totalUsedSpace, totalCapacity);
@@ -189,7 +189,7 @@ public class TestBalancerWithNodeGroup {
 
     // start rebalancing
     Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
-    final int r = Balancer.run(namenodes, Balancer.Parameters.DEFALUT, conf);
+    final int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, conf);
     Assert.assertTrue(r == Balancer.ReturnStatus.SUCCESS.code ||
         (r == Balancer.ReturnStatus.NO_MOVE_PROGRESS.code));
     waitForHeartBeat(totalUsedSpace, totalCapacity);