You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ar...@apache.org on 2014/07/31 08:02:47 UTC
svn commit: r1614812 - in
/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./
src/main/java/org/apache/hadoop/hdfs/server/balancer/
src/test/java/org/apache/hadoop/hdfs/server/balancer/
Author: arp
Date: Thu Jul 31 06:02:46 2014
New Revision: 1614812
URL: http://svn.apache.org/r1614812
Log:
HDFS-6441. Add ability to exclude/include specific datanodes while balancing. (Contributed by Benoy Antony and Yu Li)
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1614812&r1=1614811&r2=1614812&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Thu Jul 31 06:02:46 2014
@@ -335,6 +335,9 @@ Release 2.6.0 - UNRELEASED
HDFS-6570. add api that enables checking if a user has certain permissions on
a file. (Jitendra Pandey via cnauroth)
+ HDFS-6441. Add ability to exclude/include specific datanodes while
+ balancing. (Benoy Antony and Yu Li via Arpit Agarwal)
+
OPTIMIZATIONS
HDFS-6690. Deduplicate xattr names in memory. (wang)
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=1614812&r1=1614811&r2=1614812&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java Thu Jul 31 06:02:46 2014
@@ -45,6 +45,7 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -83,6 +84,7 @@ import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.HostsFileReader;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.Tool;
@@ -203,12 +205,20 @@ public class Balancer {
+ "\n\t[-policy <policy>]\tthe balancing policy: "
+ BalancingPolicy.Node.INSTANCE.getName() + " or "
+ BalancingPolicy.Pool.INSTANCE.getName()
- + "\n\t[-threshold <threshold>]\tPercentage of disk capacity";
+ + "\n\t[-threshold <threshold>]\tPercentage of disk capacity"
+ + "\n\t[-exclude [-f <hosts-file> | comma-sperated list of hosts]]"
+ + "\tExcludes the specified datanodes."
+ + "\n\t[-include [-f <hosts-file> | comma-sperated list of hosts]]"
+ + "\tIncludes only the specified datanodes.";
private final NameNodeConnector nnc;
private final BalancingPolicy policy;
private final SaslDataTransferClient saslClient;
private final double threshold;
+ // set of data nodes to be excluded from balancing operations.
+ Set<String> nodesToBeExcluded;
+ //Restrict balancing to the following nodes.
+ Set<String> nodesToBeIncluded;
// all data node lists
private final Collection<Source> overUtilizedDatanodes
@@ -869,6 +879,8 @@ public class Balancer {
Balancer(NameNodeConnector theblockpool, Parameters p, Configuration conf) {
this.threshold = p.threshold;
this.policy = p.policy;
+ this.nodesToBeExcluded = p.nodesToBeExcluded;
+ this.nodesToBeIncluded = p.nodesToBeIncluded;
this.nnc = theblockpool;
cluster = NetworkTopology.getInstance(conf);
@@ -905,8 +917,13 @@ public class Balancer {
private long initNodes(DatanodeInfo[] datanodes) {
// compute average utilization
for (DatanodeInfo datanode : datanodes) {
- if (datanode.isDecommissioned() || datanode.isDecommissionInProgress()) {
- continue; // ignore decommissioning or decommissioned nodes
+ // ignore decommissioning or decommissioned nodes or
+ // ignore nodes in exclude list
+ // or nodes not in the include list (if include list is not empty)
+ if (datanode.isDecommissioned() || datanode.isDecommissionInProgress() ||
+ Util.shouldBeExcluded(nodesToBeExcluded, datanode) ||
+ !Util.shouldBeIncluded(nodesToBeIncluded, datanode)) {
+ continue;
}
policy.accumulateSpaces(datanode);
}
@@ -919,8 +936,16 @@ public class Balancer {
*/
long overLoadedBytes = 0L, underLoadedBytes = 0L;
for (DatanodeInfo datanode : DFSUtil.shuffle(datanodes)) {
- if (datanode.isDecommissioned() || datanode.isDecommissionInProgress()) {
- continue; // ignore decommissioning or decommissioned nodes
+ // ignore decommissioning or decommissioned nodes or
+ // ignore nodes in exclude list
+ // or nodes not in the include list (if include list is not empty)
+ if (datanode.isDecommissioned() || datanode.isDecommissionInProgress() ||
+ Util.shouldBeExcluded(nodesToBeExcluded, datanode) ||
+ !Util.shouldBeIncluded(nodesToBeIncluded, datanode)) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Excluding datanode " + datanode);
+ }
+ continue;
}
cluster.add(datanode);
BalancerDatanode datanodeS;
@@ -1526,21 +1551,101 @@ public class Balancer {
}
static class Parameters {
- static final Parameters DEFALUT = new Parameters(
- BalancingPolicy.Node.INSTANCE, 10.0);
+ static final Parameters DEFAULT = new Parameters(
+ BalancingPolicy.Node.INSTANCE, 10.0,
+ Collections.<String> emptySet(), Collections.<String> emptySet());
final BalancingPolicy policy;
final double threshold;
+ // exclude the nodes in this set from balancing operations
+ Set<String> nodesToBeExcluded;
+ //include only these nodes in balancing operations
+ Set<String> nodesToBeIncluded;
- Parameters(BalancingPolicy policy, double threshold) {
+ Parameters(BalancingPolicy policy, double threshold,
+ Set<String> nodesToBeExcluded, Set<String> nodesToBeIncluded) {
this.policy = policy;
this.threshold = threshold;
+ this.nodesToBeExcluded = nodesToBeExcluded;
+ this.nodesToBeIncluded = nodesToBeIncluded;
}
@Override
public String toString() {
return Balancer.class.getSimpleName() + "." + getClass().getSimpleName()
- + "[" + policy + ", threshold=" + threshold + "]";
+ + "[" + policy + ", threshold=" + threshold +
+ ", number of nodes to be excluded = "+ nodesToBeExcluded.size() +
+ ", number of nodes to be included = "+ nodesToBeIncluded.size() +"]";
+ }
+ }
+
+ static class Util {
+
+ /**
+ * @param datanode
+ * @return returns true if data node is part of the excludedNodes.
+ */
+ static boolean shouldBeExcluded(Set<String> excludedNodes, DatanodeInfo datanode) {
+ return isIn(excludedNodes, datanode);
+ }
+
+ /**
+ * @param datanode
+ * @return returns true if includedNodes is empty or data node is part of the includedNodes.
+ */
+ static boolean shouldBeIncluded(Set<String> includedNodes, DatanodeInfo datanode) {
+ return (includedNodes.isEmpty() ||
+ isIn(includedNodes, datanode));
+ }
+ /**
+ * Match is checked using host name , ip address with and without port number.
+ * @param datanodeSet
+ * @param datanode
+ * @return true if the datanode's transfer address matches the set of nodes.
+ */
+ private static boolean isIn(Set<String> datanodeSet, DatanodeInfo datanode) {
+ return isIn(datanodeSet, datanode.getPeerHostName(), datanode.getXferPort()) ||
+ isIn(datanodeSet, datanode.getIpAddr(), datanode.getXferPort()) ||
+ isIn(datanodeSet, datanode.getHostName(), datanode.getXferPort());
+ }
+
+ /**
+ * returns true if nodes contains host or host:port
+ * @param nodes
+ * @param host
+ * @param port
+ * @return
+ */
+ private static boolean isIn(Set<String> nodes, String host, int port) {
+ if (host == null) {
+ return false;
+ }
+ return (nodes.contains(host) || nodes.contains(host +":"+ port));
+ }
+
+ /**
+ * parse a comma separated string to obtain set of host names
+ * @param string
+ * @return
+ */
+ static Set<String> parseHostList(String string) {
+ String[] addrs = StringUtils.getTrimmedStrings(string);
+ return new HashSet<String>(Arrays.asList(addrs));
+ }
+
+ /**
+ * read set of host names from a file
+ * @param fileName
+ * @return
+ */
+ static Set<String> getHostListFromFile(String fileName) {
+ Set<String> nodes = new HashSet <String> ();
+ try {
+ HostsFileReader.readFileToSet("nodes", fileName, nodes);
+ return StringUtils.getTrimmedStrings(nodes);
+ } catch (IOException e) {
+ throw new IllegalArgumentException("Unable to open file: " + fileName);
+ }
}
}
@@ -1578,8 +1683,10 @@ public class Balancer {
/** parse command line arguments */
static Parameters parse(String[] args) {
- BalancingPolicy policy = Parameters.DEFALUT.policy;
- double threshold = Parameters.DEFALUT.threshold;
+ BalancingPolicy policy = Parameters.DEFAULT.policy;
+ double threshold = Parameters.DEFAULT.threshold;
+ Set<String> nodesTobeExcluded = Parameters.DEFAULT.nodesToBeExcluded;
+ Set<String> nodesTobeIncluded = Parameters.DEFAULT.nodesToBeIncluded;
if (args != null) {
try {
@@ -1608,18 +1715,38 @@ public class Balancer {
System.err.println("Illegal policy name: " + args[i]);
throw e;
}
+ } else if ("-exclude".equalsIgnoreCase(args[i])) {
+ i++;
+ if ("-f".equalsIgnoreCase(args[i])) {
+ nodesTobeExcluded = Util.getHostListFromFile(args[++i]);
+ } else {
+ nodesTobeExcluded = Util.parseHostList(args[i]);
+ }
+ } else if ("-include".equalsIgnoreCase(args[i])) {
+ i++;
+ if ("-f".equalsIgnoreCase(args[i])) {
+ nodesTobeIncluded = Util.getHostListFromFile(args[++i]);
+ } else {
+ nodesTobeIncluded = Util.parseHostList(args[i]);
+ }
} else {
throw new IllegalArgumentException("args = "
+ Arrays.toString(args));
}
}
+ if (!nodesTobeExcluded.isEmpty() && !nodesTobeIncluded.isEmpty()) {
+ System.err.println(
+ "-exclude and -include options cannot be specified together.");
+ throw new IllegalArgumentException(
+ "-exclude and -include options cannot be specified together.");
+ }
} catch(RuntimeException e) {
printUsage(System.err);
throw e;
}
}
- return new Parameters(policy, threshold);
+ return new Parameters(policy, threshold, nodesTobeExcluded, nodesTobeIncluded);
}
private static void printUsage(PrintStream out) {
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java?rev=1614812&r1=1614811&r2=1614812&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java Thu Jul 31 06:02:46 2014
@@ -18,17 +18,23 @@
package org.apache.hadoop.hdfs.server.balancer;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import java.io.File;
import java.io.IOException;
+import java.io.PrintWriter;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.HashSet;
import java.util.List;
import java.util.Random;
+import java.util.Set;
import java.util.concurrent.TimeoutException;
+import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
@@ -48,6 +54,8 @@ import org.apache.hadoop.hdfs.protocol.E
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.balancer.Balancer.Cli;
+import org.apache.hadoop.hdfs.server.balancer.Balancer.Parameters;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.Tool;
@@ -255,6 +263,18 @@ public class TestBalancer {
}
}
}
+
+ /**
+ * Wait until balanced: each datanode gives utilization within
+ * BALANCE_ALLOWED_VARIANCE of average
+ * @throws IOException
+ * @throws TimeoutException
+ */
+ static void waitForBalancer(long totalUsedSpace, long totalCapacity,
+ ClientProtocol client, MiniDFSCluster cluster, Balancer.Parameters p)
+ throws IOException, TimeoutException {
+ waitForBalancer(totalUsedSpace, totalCapacity, client, cluster, p, 0);
+ }
/**
* Wait until balanced: each datanode gives utilization within
@@ -263,11 +283,17 @@ public class TestBalancer {
* @throws TimeoutException
*/
static void waitForBalancer(long totalUsedSpace, long totalCapacity,
- ClientProtocol client, MiniDFSCluster cluster)
- throws IOException, TimeoutException {
+ ClientProtocol client, MiniDFSCluster cluster, Balancer.Parameters p,
+ int expectedExcludedNodes) throws IOException, TimeoutException {
long timeout = TIMEOUT;
long failtime = (timeout <= 0L) ? Long.MAX_VALUE
: Time.now() + timeout;
+ if (!p.nodesToBeIncluded.isEmpty()) {
+ totalCapacity = p.nodesToBeIncluded.size() * CAPACITY;
+ }
+ if (!p.nodesToBeExcluded.isEmpty()) {
+ totalCapacity -= p.nodesToBeExcluded.size() * CAPACITY;
+ }
final double avgUtilization = ((double)totalUsedSpace) / totalCapacity;
boolean balanced;
do {
@@ -275,9 +301,20 @@ public class TestBalancer {
client.getDatanodeReport(DatanodeReportType.ALL);
assertEquals(datanodeReport.length, cluster.getDataNodes().size());
balanced = true;
+ int actualExcludedNodeCount = 0;
for (DatanodeInfo datanode : datanodeReport) {
double nodeUtilization = ((double)datanode.getDfsUsed())
/ datanode.getCapacity();
+ if (Balancer.Util.shouldBeExcluded(p.nodesToBeExcluded, datanode)) {
+ assertTrue(nodeUtilization == 0);
+ actualExcludedNodeCount++;
+ continue;
+ }
+ if (!Balancer.Util.shouldBeIncluded(p.nodesToBeIncluded, datanode)) {
+ assertTrue(nodeUtilization == 0);
+ actualExcludedNodeCount++;
+ continue;
+ }
if (Math.abs(avgUtilization - nodeUtilization) > BALANCE_ALLOWED_VARIANCE) {
balanced = false;
if (Time.now() > failtime) {
@@ -294,6 +331,7 @@ public class TestBalancer {
break;
}
}
+ assertEquals(expectedExcludedNodes,actualExcludedNodeCount);
} while (!balanced);
}
@@ -307,22 +345,118 @@ public class TestBalancer {
}
return b.append("]").toString();
}
- /** This test start a cluster with specified number of nodes,
+ /**
+ * Class which contains information about the
+ * new nodes to be added to the cluster for balancing.
+ */
+ static abstract class NewNodeInfo {
+
+ Set<String> nodesToBeExcluded = new HashSet<String>();
+ Set<String> nodesToBeIncluded = new HashSet<String>();
+
+ abstract String[] getNames();
+ abstract int getNumberofNewNodes();
+ abstract int getNumberofIncludeNodes();
+ abstract int getNumberofExcludeNodes();
+
+ public Set<String> getNodesToBeIncluded() {
+ return nodesToBeIncluded;
+ }
+ public Set<String> getNodesToBeExcluded() {
+ return nodesToBeExcluded;
+ }
+ }
+
+ /**
+ * The host names of new nodes are specified
+ */
+ static class HostNameBasedNodes extends NewNodeInfo {
+ String[] hostnames;
+
+ public HostNameBasedNodes(String[] hostnames,
+ Set<String> nodesToBeExcluded, Set<String> nodesToBeIncluded) {
+ this.hostnames = hostnames;
+ this.nodesToBeExcluded = nodesToBeExcluded;
+ this.nodesToBeIncluded = nodesToBeIncluded;
+ }
+
+ @Override
+ String[] getNames() {
+ return hostnames;
+ }
+ @Override
+ int getNumberofNewNodes() {
+ return hostnames.length;
+ }
+ @Override
+ int getNumberofIncludeNodes() {
+ return nodesToBeIncluded.size();
+ }
+ @Override
+ int getNumberofExcludeNodes() {
+ return nodesToBeExcluded.size();
+ }
+ }
+
+ /**
+ * The number of data nodes to be started are specified.
+ * The data nodes will have same host name, but different port numbers.
+ *
+ */
+ static class PortNumberBasedNodes extends NewNodeInfo {
+ int newNodes;
+ int excludeNodes;
+ int includeNodes;
+
+ public PortNumberBasedNodes(int newNodes, int excludeNodes, int includeNodes) {
+ this.newNodes = newNodes;
+ this.excludeNodes = excludeNodes;
+ this.includeNodes = includeNodes;
+ }
+
+ @Override
+ String[] getNames() {
+ return null;
+ }
+ @Override
+ int getNumberofNewNodes() {
+ return newNodes;
+ }
+ @Override
+ int getNumberofIncludeNodes() {
+ return includeNodes;
+ }
+ @Override
+ int getNumberofExcludeNodes() {
+ return excludeNodes;
+ }
+ }
+
+ private void doTest(Configuration conf, long[] capacities, String[] racks,
+ long newCapacity, String newRack, boolean useTool) throws Exception {
+ doTest(conf, capacities, racks, newCapacity, newRack, null, useTool, false);
+ }
+
+ /** This test start a cluster with specified number of nodes,
* and fills it to be 30% full (with a single file replicated identically
* to all datanodes);
* It then adds one new empty node and starts balancing.
- *
+ *
* @param conf - configuration
* @param capacities - array of capacities of original nodes in cluster
* @param racks - array of racks for original nodes in cluster
* @param newCapacity - new node's capacity
* @param newRack - new node's rack
+ * @param nodes - information about new nodes to be started.
* @param useTool - if true run test via Cli with command-line argument
* parsing, etc. Otherwise invoke balancer API directly.
+ * @param useFile - if true, the hosts to included or excluded will be stored in a
+ * file and then later read from the file.
* @throws Exception
*/
- private void doTest(Configuration conf, long[] capacities, String[] racks,
- long newCapacity, String newRack, boolean useTool) throws Exception {
+ private void doTest(Configuration conf, long[] capacities,
+ String[] racks, long newCapacity, String newRack, NewNodeInfo nodes,
+ boolean useTool, boolean useFile) throws Exception {
LOG.info("capacities = " + long2String(capacities));
LOG.info("racks = " + Arrays.asList(racks));
LOG.info("newCapacity= " + newCapacity);
@@ -346,17 +480,75 @@ public class TestBalancer {
long totalUsedSpace = totalCapacity*3/10;
createFile(cluster, filePath, totalUsedSpace / numOfDatanodes,
(short) numOfDatanodes, 0);
- // start up an empty node with the same capacity and on the same rack
- cluster.startDataNodes(conf, 1, true, null,
- new String[]{newRack}, new long[]{newCapacity});
- totalCapacity += newCapacity;
+ if (nodes == null) { // there is no specification of new nodes.
+ // start up an empty node with the same capacity and on the same rack
+ cluster.startDataNodes(conf, 1, true, null,
+ new String[]{newRack}, null,new long[]{newCapacity});
+ totalCapacity += newCapacity;
+ } else {
+ //if running a test with "include list", include original nodes as well
+ if (nodes.getNumberofIncludeNodes()>0) {
+ for (DataNode dn: cluster.getDataNodes())
+ nodes.getNodesToBeIncluded().add(dn.getDatanodeId().getHostName());
+ }
+ String[] newRacks = new String[nodes.getNumberofNewNodes()];
+ long[] newCapacities = new long[nodes.getNumberofNewNodes()];
+ for (int i=0; i < nodes.getNumberofNewNodes(); i++) {
+ newRacks[i] = newRack;
+ newCapacities[i] = newCapacity;
+ }
+ // if host names are specified for the new nodes to be created.
+ if (nodes.getNames() != null) {
+ cluster.startDataNodes(conf, nodes.getNumberofNewNodes(), true, null,
+ newRacks, nodes.getNames(), newCapacities);
+ totalCapacity += newCapacity*nodes.getNumberofNewNodes();
+ } else { // host names are not specified
+ cluster.startDataNodes(conf, nodes.getNumberofNewNodes(), true, null,
+ newRacks, null, newCapacities);
+ totalCapacity += newCapacity*nodes.getNumberofNewNodes();
+ //populate the include nodes
+ if (nodes.getNumberofIncludeNodes() > 0) {
+ int totalNodes = cluster.getDataNodes().size();
+ for (int i=0; i < nodes.getNumberofIncludeNodes(); i++) {
+ nodes.getNodesToBeIncluded().add (cluster.getDataNodes().get(
+ totalNodes-1-i).getDatanodeId().getXferAddr());
+ }
+ }
+ //polulate the exclude nodes
+ if (nodes.getNumberofExcludeNodes() > 0) {
+ int totalNodes = cluster.getDataNodes().size();
+ for (int i=0; i < nodes.getNumberofExcludeNodes(); i++) {
+ nodes.getNodesToBeExcluded().add (cluster.getDataNodes().get(
+ totalNodes-1-i).getDatanodeId().getXferAddr());
+ }
+ }
+ }
+ }
+ // run balancer and validate results
+ Balancer.Parameters p = Balancer.Parameters.DEFAULT;
+ if (nodes != null) {
+ p = new Balancer.Parameters(
+ Balancer.Parameters.DEFAULT.policy,
+ Balancer.Parameters.DEFAULT.threshold,
+ nodes.getNodesToBeExcluded(), nodes.getNodesToBeIncluded());
+ }
+
+ int expectedExcludedNodes = 0;
+ if (nodes != null) {
+ if (!nodes.getNodesToBeExcluded().isEmpty()) {
+ expectedExcludedNodes = nodes.getNodesToBeExcluded().size();
+ } else if (!nodes.getNodesToBeIncluded().isEmpty()) {
+ expectedExcludedNodes =
+ cluster.getDataNodes().size() - nodes.getNodesToBeIncluded().size();
+ }
+ }
// run balancer and validate results
if (useTool) {
- runBalancerCli(conf, totalUsedSpace, totalCapacity);
+ runBalancerCli(conf, totalUsedSpace, totalCapacity, p, useFile, expectedExcludedNodes);
} else {
- runBalancer(conf, totalUsedSpace, totalCapacity);
+ runBalancer(conf, totalUsedSpace, totalCapacity, p, expectedExcludedNodes);
}
} finally {
cluster.shutdown();
@@ -365,11 +557,17 @@ public class TestBalancer {
private void runBalancer(Configuration conf,
long totalUsedSpace, long totalCapacity) throws Exception {
+ runBalancer(conf, totalUsedSpace, totalCapacity, Balancer.Parameters.DEFAULT, 0);
+ }
+
+ private void runBalancer(Configuration conf,
+ long totalUsedSpace, long totalCapacity, Balancer.Parameters p,
+ int excludedNodes) throws Exception {
waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster);
// start rebalancing
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
- final int r = Balancer.run(namenodes, Balancer.Parameters.DEFALUT, conf);
+ final int r = Balancer.run(namenodes, p, conf);
if (conf.getInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT) ==0) {
assertEquals(Balancer.ReturnStatus.NO_MOVE_PROGRESS.code, r);
@@ -379,22 +577,66 @@ public class TestBalancer {
}
waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster);
LOG.info("Rebalancing with default ctor.");
- waitForBalancer(totalUsedSpace, totalCapacity, client, cluster);
+ waitForBalancer(totalUsedSpace, totalCapacity, client, cluster, p, excludedNodes);
}
-
+
private void runBalancerCli(Configuration conf,
- long totalUsedSpace, long totalCapacity) throws Exception {
+ long totalUsedSpace, long totalCapacity,
+ Balancer.Parameters p, boolean useFile, int expectedExcludedNodes) throws Exception {
waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster);
+ List <String> args = new ArrayList<String>();
+ args.add("-policy");
+ args.add("datanode");
+
+ File excludeHostsFile = null;
+ if (!p.nodesToBeExcluded.isEmpty()) {
+ args.add("-exclude");
+ if (useFile) {
+ excludeHostsFile = new File ("exclude-hosts-file");
+ PrintWriter pw = new PrintWriter(excludeHostsFile);
+ for (String host: p.nodesToBeExcluded) {
+ pw.write( host + "\n");
+ }
+ pw.close();
+ args.add("-f");
+ args.add("exclude-hosts-file");
+ } else {
+ args.add(StringUtils.join(p.nodesToBeExcluded, ','));
+ }
+ }
+
+ File includeHostsFile = null;
+ if (!p.nodesToBeIncluded.isEmpty()) {
+ args.add("-include");
+ if (useFile) {
+ includeHostsFile = new File ("include-hosts-file");
+ PrintWriter pw = new PrintWriter(includeHostsFile);
+ for (String host: p.nodesToBeIncluded){
+ pw.write( host + "\n");
+ }
+ pw.close();
+ args.add("-f");
+ args.add("include-hosts-file");
+ } else {
+ args.add(StringUtils.join(p.nodesToBeIncluded, ','));
+ }
+ }
- final String[] args = { "-policy", "datanode" };
final Tool tool = new Cli();
tool.setConf(conf);
- final int r = tool.run(args); // start rebalancing
+ final int r = tool.run(args.toArray(new String[0])); // start rebalancing
assertEquals("Tools should exit 0 on success", 0, r);
waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster);
LOG.info("Rebalancing with default ctor.");
- waitForBalancer(totalUsedSpace, totalCapacity, client, cluster);
+ waitForBalancer(totalUsedSpace, totalCapacity, client, cluster, p, expectedExcludedNodes);
+
+ if (excludeHostsFile != null && excludeHostsFile.exists()) {
+ excludeHostsFile.delete();
+ }
+ if (includeHostsFile != null && includeHostsFile.exists()) {
+ includeHostsFile.delete();
+ }
}
/** one-node cluster test*/
@@ -440,7 +682,7 @@ public class TestBalancer {
}
}
- /** Test a cluster with even distribution,
+ /** Test a cluster with even distribution,
* then a new empty node is added to the cluster*/
@Test(timeout=100000)
public void testBalancer0() throws Exception {
@@ -554,7 +796,13 @@ public class TestBalancer {
} catch (IllegalArgumentException e) {
}
+ parameters = new String[] {"-include", "testnode1", "-exclude", "testnode2"};
+ try {
+ Balancer.Cli.parse(parameters);
+ fail("IllegalArgumentException is expected when both -exclude and -include are specified");
+ } catch (IllegalArgumentException e) {
+ }
}
/**
@@ -570,6 +818,183 @@ public class TestBalancer {
}
/**
+ * Test a cluster with even distribution,
+ * then three nodes are added to the cluster,
+ * runs balancer with two of the nodes in the exclude list
+ */
+ @Test(timeout=100000)
+ public void testBalancerWithExcludeList() throws Exception {
+ final Configuration conf = new HdfsConfiguration();
+ initConf(conf);
+ Set<String> excludeHosts = new HashSet<String>();
+ excludeHosts.add( "datanodeY");
+ excludeHosts.add( "datanodeZ");
+ doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2,
+ new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"},
+ excludeHosts, Parameters.DEFAULT.nodesToBeIncluded), false, false);
+ }
+
+ /**
+ * Test a cluster with even distribution,
+ * then three nodes are added to the cluster,
+ * runs balancer with two of the nodes in the exclude list
+ */
+ @Test(timeout=100000)
+ public void testBalancerWithExcludeListWithPorts() throws Exception {
+ final Configuration conf = new HdfsConfiguration();
+ initConf(conf);
+ doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1},
+ CAPACITY, RACK2, new PortNumberBasedNodes(3, 2, 0), false, false);
+ }
+
+ /**
+ * Test a cluster with even distribution,
+ * then three nodes are added to the cluster,
+ * runs balancer with two of the nodes in the exclude list
+ */
+ @Test(timeout=100000)
+ public void testBalancerCliWithExcludeList() throws Exception {
+ final Configuration conf = new HdfsConfiguration();
+ initConf(conf);
+ Set<String> excludeHosts = new HashSet<String>();
+ excludeHosts.add( "datanodeY");
+ excludeHosts.add( "datanodeZ");
+ doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2,
+ new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"}, excludeHosts,
+ Parameters.DEFAULT.nodesToBeIncluded), true, false);
+ }
+
+ /**
+ * Test a cluster with even distribution,
+ * then three nodes are added to the cluster,
+ * runs balancer with two of the nodes in the exclude list
+ */
+ @Test(timeout=100000)
+ public void testBalancerCliWithExcludeListWithPorts() throws Exception {
+ final Configuration conf = new HdfsConfiguration();
+ initConf(conf);
+ doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1},
+ CAPACITY, RACK2, new PortNumberBasedNodes(3, 2, 0), true, false);
+ }
+
+ /**
+ * Test a cluster with even distribution,
+ * then three nodes are added to the cluster,
+ * runs balancer with two of the nodes in the exclude list in a file
+ */
+ @Test(timeout=100000)
+ public void testBalancerCliWithExcludeListInAFile() throws Exception {
+ final Configuration conf = new HdfsConfiguration();
+ initConf(conf);
+ Set<String> excludeHosts = new HashSet<String>();
+ excludeHosts.add( "datanodeY");
+ excludeHosts.add( "datanodeZ");
+ doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2,
+ new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"},
+ excludeHosts, Parameters.DEFAULT.nodesToBeIncluded), true, true);
+ }
+
+ /**
+ * Test a cluster with even distribution,G
+ * then three nodes are added to the cluster,
+ * runs balancer with two of the nodes in the exclude list
+ */
+ @Test(timeout=100000)
+ public void testBalancerCliWithExcludeListWithPortsInAFile() throws Exception {
+ final Configuration conf = new HdfsConfiguration();
+ initConf(conf);
+ doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1},
+ CAPACITY, RACK2, new PortNumberBasedNodes(3, 2, 0), true, true);
+ }
+
+ /**
+ * Test a cluster with even distribution,
+ * then three nodes are added to the cluster,
+ * runs balancer with two of the nodes in the include list
+ */
+ @Test(timeout=100000)
+ public void testBalancerWithIncludeList() throws Exception {
+ final Configuration conf = new HdfsConfiguration();
+ initConf(conf);
+ Set<String> includeHosts = new HashSet<String>();
+ includeHosts.add( "datanodeY");
+ doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2,
+ new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"},
+ Parameters.DEFAULT.nodesToBeExcluded, includeHosts), false, false);
+ }
+
+ /**
+ * Test a cluster with even distribution,
+ * then three nodes are added to the cluster,
+ * runs balancer with two of the nodes in the include list
+ */
+ @Test(timeout=100000)
+ public void testBalancerWithIncludeListWithPorts() throws Exception {
+ final Configuration conf = new HdfsConfiguration();
+ initConf(conf);
+ doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1},
+ CAPACITY, RACK2, new PortNumberBasedNodes(3, 0, 1), false, false);
+ }
+
+ /**
+ * Test a cluster with even distribution,
+ * then three nodes are added to the cluster,
+ * runs balancer with two of the nodes in the include list
+ */
+ @Test(timeout=100000)
+ public void testBalancerCliWithIncludeList() throws Exception {
+ final Configuration conf = new HdfsConfiguration();
+ initConf(conf);
+ Set<String> includeHosts = new HashSet<String>();
+ includeHosts.add( "datanodeY");
+ doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2,
+ new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"},
+ Parameters.DEFAULT.nodesToBeExcluded, includeHosts), true, false);
+ }
+
+ /**
+ * Test a cluster with even distribution,
+ * then three nodes are added to the cluster,
+ * runs balancer with two of the nodes in the include list
+ */
+ @Test(timeout=100000)
+ public void testBalancerCliWithIncludeListWithPorts() throws Exception {
+ final Configuration conf = new HdfsConfiguration();
+ initConf(conf);
+ doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1},
+ CAPACITY, RACK2, new PortNumberBasedNodes(3, 0, 1), true, false);
+ }
+
+ /**
+ * Test a cluster with even distribution,
+ * then three nodes are added to the cluster,
+ * runs balancer with two of the nodes in the include list
+ */
+ @Test(timeout=100000)
+ public void testBalancerCliWithIncludeListInAFile() throws Exception {
+ final Configuration conf = new HdfsConfiguration();
+ initConf(conf);
+ Set<String> includeHosts = new HashSet<String>();
+ includeHosts.add( "datanodeY");
+ doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2,
+ new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"},
+ Parameters.DEFAULT.nodesToBeExcluded, includeHosts), true, true);
+ }
+
+ /**
+ * Test a cluster with even distribution,
+ * then three nodes are added to the cluster,
+ * runs balancer with two of the nodes in the include list
+ */
+ @Test(timeout=100000)
+ public void testBalancerCliWithIncludeListWithPortsInAFile() throws Exception {
+ final Configuration conf = new HdfsConfiguration();
+ initConf(conf);
+ doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1},
+ CAPACITY, RACK2, new PortNumberBasedNodes(3, 0, 1), true, true);
+ }
+
+ /**
* @param args
*/
public static void main(String[] args) throws Exception {
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java?rev=1614812&r1=1614811&r2=1614812&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java Thu Jul 31 06:02:46 2014
@@ -97,10 +97,10 @@ public class TestBalancerWithHANameNodes
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
assertEquals(1, namenodes.size());
assertTrue(namenodes.contains(HATestUtil.getLogicalUri(cluster)));
- final int r = Balancer.run(namenodes, Balancer.Parameters.DEFALUT, conf);
+ final int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, conf);
assertEquals(Balancer.ReturnStatus.SUCCESS.code, r);
TestBalancer.waitForBalancer(totalUsedSpace, totalCapacity, client,
- cluster);
+ cluster, Balancer.Parameters.DEFAULT);
} finally {
cluster.shutdown();
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java?rev=1614812&r1=1614811&r2=1614812&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java Thu Jul 31 06:02:46 2014
@@ -159,7 +159,7 @@ public class TestBalancerWithMultipleNam
// start rebalancing
final Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(s.conf);
- final int r = Balancer.run(namenodes, Balancer.Parameters.DEFALUT, s.conf);
+ final int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, s.conf);
Assert.assertEquals(Balancer.ReturnStatus.SUCCESS.code, r);
LOG.info("BALANCER 2");
@@ -195,7 +195,7 @@ public class TestBalancerWithMultipleNam
balanced = true;
for(int d = 0; d < used.length; d++) {
final double p = used[d]*100.0/cap[d];
- balanced = p <= avg + Balancer.Parameters.DEFALUT.threshold;
+ balanced = p <= avg + Balancer.Parameters.DEFAULT.threshold;
if (!balanced) {
if (i % 100 == 0) {
LOG.warn("datanodes " + d + " is not yet balanced: "
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java?rev=1614812&r1=1614811&r2=1614812&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java Thu Jul 31 06:02:46 2014
@@ -175,7 +175,7 @@ public class TestBalancerWithNodeGroup {
// start rebalancing
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
- final int r = Balancer.run(namenodes, Balancer.Parameters.DEFALUT, conf);
+ final int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, conf);
assertEquals(Balancer.ReturnStatus.SUCCESS.code, r);
waitForHeartBeat(totalUsedSpace, totalCapacity);
@@ -189,7 +189,7 @@ public class TestBalancerWithNodeGroup {
// start rebalancing
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
- final int r = Balancer.run(namenodes, Balancer.Parameters.DEFALUT, conf);
+ final int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, conf);
Assert.assertTrue(r == Balancer.ReturnStatus.SUCCESS.code ||
(r == Balancer.ReturnStatus.NO_MOVE_PROGRESS.code));
waitForHeartBeat(totalUsedSpace, totalCapacity);