You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wa...@apache.org on 2015/09/04 23:42:31 UTC
[26/50] [abbrv] hadoop git commit: HDFS-8890. Allow admin to specify
which blockpools the balancer should run on. (Chris Trezzo via mingma)
HDFS-8890. Allow admin to specify which blockpools the balancer should run on. (Chris Trezzo via mingma)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d31a41c3
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d31a41c3
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d31a41c3
Branch: refs/heads/YARN-1197
Commit: d31a41c35927f02f2fb40d19380b5df4bb2b6d57
Parents: de928d5
Author: Ming Ma <mi...@apache.org>
Authored: Wed Sep 2 15:55:42 2015 -0700
Committer: Ming Ma <mi...@apache.org>
Committed: Wed Sep 2 15:55:42 2015 -0700
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +
.../hadoop/hdfs/server/balancer/Balancer.java | 82 ++++++---
.../src/site/markdown/HDFSCommands.md | 2 +
.../hdfs/server/balancer/TestBalancer.java | 43 ++++-
.../TestBalancerWithMultipleNameNodes.java | 179 ++++++++++++++++---
5 files changed, 253 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d31a41c3/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 7a685f5..e68c011 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -880,6 +880,9 @@ Release 2.8.0 - UNRELEASED
HDFS-328. Improve fs -setrep error message for invalid replication factors.
(Daniel Templeton via wang)
+ HDFS-8890. Allow admin to specify which blockpools the balancer should run
+ on. (Chris Trezzo via mingma)
+
OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d31a41c3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
index fe6e4c3..259b280 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
@@ -179,6 +179,8 @@ public class Balancer {
+ "\tExcludes the specified datanodes."
+ "\n\t[-include [-f <hosts-file> | <comma-separated list of hosts>]]"
+ "\tIncludes only the specified datanodes."
+ + "\n\t[-blockpools <comma-separated list of blockpool ids>]"
+ + "\tThe balancer will only run on blockpools included in this list."
+ "\n\t[-idleiterations <idleiterations>]"
+ "\tNumber of consecutive idle iterations (-1 for Infinite) before "
+ "exit."
@@ -652,22 +654,27 @@ public class Balancer {
done = true;
Collections.shuffle(connectors);
for(NameNodeConnector nnc : connectors) {
- final Balancer b = new Balancer(nnc, p, conf);
- final Result r = b.runOneIteration();
- r.print(iteration, System.out);
-
- // clean all lists
- b.resetData(conf);
- if (r.exitStatus == ExitStatus.IN_PROGRESS) {
- done = false;
- } else if (r.exitStatus != ExitStatus.SUCCESS) {
- //must be an error statue, return.
- return r.exitStatus.getExitCode();
- }
- }
+ if (p.blockpools.size() == 0
+ || p.blockpools.contains(nnc.getBlockpoolID())) {
+ final Balancer b = new Balancer(nnc, p, conf);
+ final Result r = b.runOneIteration();
+ r.print(iteration, System.out);
+
+ // clean all lists
+ b.resetData(conf);
+ if (r.exitStatus == ExitStatus.IN_PROGRESS) {
+ done = false;
+ } else if (r.exitStatus != ExitStatus.SUCCESS) {
+ // must be an error statue, return.
+ return r.exitStatus.getExitCode();
+ }
- if (!done) {
- Thread.sleep(sleeptime);
+ if (!done) {
+ Thread.sleep(sleeptime);
+ }
+ } else {
+ LOG.info("Skipping blockpool " + nnc.getBlockpoolID());
+ }
}
}
} finally {
@@ -699,12 +706,12 @@ public class Balancer {
}
static class Parameters {
- static final Parameters DEFAULT = new Parameters(
- BalancingPolicy.Node.INSTANCE, 10.0,
- NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS,
- Collections.<String>emptySet(), Collections.<String>emptySet(),
- Collections.<String>emptySet(),
- false);
+ static final Parameters DEFAULT =
+ new Parameters(BalancingPolicy.Node.INSTANCE, 10.0,
+ NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS,
+ Collections.<String> emptySet(), Collections.<String> emptySet(),
+ Collections.<String> emptySet(), Collections.<String> emptySet(),
+ false);
final BalancingPolicy policy;
final double threshold;
@@ -718,19 +725,25 @@ public class Balancer {
*/
final Set<String> sourceNodes;
/**
+ * A set of block pools to run the balancer on.
+ */
+ final Set<String> blockpools;
+ /**
* Whether to run the balancer during upgrade.
*/
final boolean runDuringUpgrade;
Parameters(BalancingPolicy policy, double threshold, int maxIdleIteration,
Set<String> excludedNodes, Set<String> includedNodes,
- Set<String> sourceNodes, boolean runDuringUpgrade) {
+ Set<String> sourceNodes, Set<String> blockpools,
+ boolean runDuringUpgrade) {
this.policy = policy;
this.threshold = threshold;
this.maxIdleIteration = maxIdleIteration;
this.excludedNodes = excludedNodes;
this.includedNodes = includedNodes;
this.sourceNodes = sourceNodes;
+ this.blockpools = blockpools;
this.runDuringUpgrade = runDuringUpgrade;
}
@@ -742,10 +755,11 @@ public class Balancer {
+ " #excluded nodes = %s,"
+ " #included nodes = %s,"
+ " #source nodes = %s,"
+ + " #blockpools = %s,"
+ " run during upgrade = %s]",
- Balancer.class.getSimpleName(), getClass().getSimpleName(),
- policy, threshold, maxIdleIteration,
- excludedNodes.size(), includedNodes.size(), sourceNodes.size(),
+ Balancer.class.getSimpleName(), getClass().getSimpleName(), policy,
+ threshold, maxIdleIteration, excludedNodes.size(),
+ includedNodes.size(), sourceNodes.size(), blockpools.size(),
runDuringUpgrade);
}
}
@@ -789,6 +803,7 @@ public class Balancer {
Set<String> excludedNodes = Parameters.DEFAULT.excludedNodes;
Set<String> includedNodes = Parameters.DEFAULT.includedNodes;
Set<String> sourceNodes = Parameters.DEFAULT.sourceNodes;
+ Set<String> blockpools = Parameters.DEFAULT.blockpools;
boolean runDuringUpgrade = Parameters.DEFAULT.runDuringUpgrade;
if (args != null) {
@@ -828,6 +843,14 @@ public class Balancer {
} else if ("-source".equalsIgnoreCase(args[i])) {
sourceNodes = new HashSet<>();
i = processHostList(args, i, "source", sourceNodes);
+ } else if ("-blockpools".equalsIgnoreCase(args[i])) {
+ checkArgument(
+ ++i < args.length,
+ "blockpools value is missing: args = "
+ + Arrays.toString(args));
+ blockpools = parseBlockPoolList(args[i]);
+ LOG.info("Balancer will run on the following blockpools: "
+ + blockpools.toString());
} else if ("-idleiterations".equalsIgnoreCase(args[i])) {
checkArgument(++i < args.length,
"idleiterations value is missing: args = " + Arrays
@@ -853,8 +876,8 @@ public class Balancer {
}
}
- return new Parameters(policy, threshold, maxIdleIteration,
- excludedNodes, includedNodes, sourceNodes, runDuringUpgrade);
+ return new Parameters(policy, threshold, maxIdleIteration, excludedNodes,
+ includedNodes, sourceNodes, blockpools, runDuringUpgrade);
}
private static int processHostList(String[] args, int i, String type,
@@ -881,6 +904,11 @@ public class Balancer {
return i;
}
+ private static Set<String> parseBlockPoolList(String string) {
+ String[] addrs = StringUtils.getTrimmedStrings(string);
+ return new HashSet<String>(Arrays.asList(addrs));
+ }
+
private static void printUsage(PrintStream out) {
out.println(USAGE + "\n");
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d31a41c3/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md
index 1c2026c..8bbcbb8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md
@@ -266,6 +266,7 @@ Usage:
[-policy <policy>]
[-exclude [-f <hosts-file> | <comma-separated list of hosts>]]
[-include [-f <hosts-file> | <comma-separated list of hosts>]]
+ [-blockpools <comma-separated list of blockpool ids>]
[-idleiterations <idleiterations>]
| COMMAND\_OPTION | Description |
@@ -274,6 +275,7 @@ Usage:
| `-threshold` \<threshold\> | Percentage of disk capacity. This overwrites the default threshold. |
| `-exclude -f` \<hosts-file\> \| \<comma-separated list of hosts\> | Excludes the specified datanodes from being balanced by the balancer. |
| `-include -f` \<hosts-file\> \| \<comma-separated list of hosts\> | Includes only the specified datanodes to be balanced by the balancer. |
+| `-blockpools` \<comma-separated list of blockpool ids\> | The balancer will only run on blockpools included in this list. |
| `-idleiterations` \<iterations\> | Maximum number of idle iterations before exit. This overwrites the default idleiterations(5). |
Runs a cluster balancing utility. An administrator can simply press Ctrl-C to stop the rebalancing process. See [Balancer](./HdfsUserGuide.html#Balancer) for more details.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d31a41c3/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
index c1ed758..b0223d2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
@@ -644,7 +644,7 @@ public class TestBalancer {
Balancer.Parameters.DEFAULT.maxIdleIteration,
nodes.getNodesToBeExcluded(), nodes.getNodesToBeIncluded(),
Balancer.Parameters.DEFAULT.sourceNodes,
- false);
+ Balancer.Parameters.DEFAULT.blockpools, false);
}
int expectedExcludedNodes = 0;
@@ -885,7 +885,7 @@ public class TestBalancer {
Balancer.Parameters.DEFAULT.maxIdleIteration,
datanodes, Balancer.Parameters.DEFAULT.includedNodes,
Balancer.Parameters.DEFAULT.sourceNodes,
- false);
+ Balancer.Parameters.DEFAULT.blockpools, false);
final int r = Balancer.run(namenodes, p, conf);
assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
} finally {
@@ -1067,6 +1067,34 @@ public class TestBalancer {
} catch (IllegalArgumentException e) {
}
+
+ parameters = new String[] { "-blockpools" };
+ try {
+ Balancer.Cli.parse(parameters);
+ fail("IllegalArgumentException is expected when a value "
+ + "is not specified for the blockpool flag");
+ } catch (IllegalArgumentException e) {
+
+ }
+ }
+
+ @Test
+ public void testBalancerCliParseBlockpools() {
+ String[] parameters = new String[] { "-blockpools", "bp-1,bp-2,bp-3" };
+ Balancer.Parameters p = Balancer.Cli.parse(parameters);
+ assertEquals(3, p.blockpools.size());
+
+ parameters = new String[] { "-blockpools", "bp-1" };
+ p = Balancer.Cli.parse(parameters);
+ assertEquals(1, p.blockpools.size());
+
+ parameters = new String[] { "-blockpools", "bp-1,,bp-2" };
+ p = Balancer.Cli.parse(parameters);
+ assertEquals(3, p.blockpools.size());
+
+ parameters = new String[] { "-blockpools", "bp-1," };
+ p = Balancer.Cli.parse(parameters);
+ assertEquals(1, p.blockpools.size());
}
/**
@@ -1385,7 +1413,7 @@ public class TestBalancer {
Parameters.DEFAULT.excludedNodes,
Parameters.DEFAULT.includedNodes,
Parameters.DEFAULT.sourceNodes,
- true);
+ Balancer.Parameters.DEFAULT.blockpools, true);
assertEquals(ExitStatus.SUCCESS.getExitCode(),
Balancer.run(namenodes, runDuringUpgrade, conf));
@@ -1588,7 +1616,8 @@ public class TestBalancer {
BalancingPolicy.Node.INSTANCE, 1,
NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS,
Collections.<String> emptySet(), Collections.<String> emptySet(),
- Collections.<String> emptySet(), false);
+ Collections.<String> emptySet(),
+ Balancer.Parameters.DEFAULT.blockpools, false);
conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 50);
final int r = Balancer.run(namenodes, p, conf);
@@ -1607,7 +1636,7 @@ public class TestBalancer {
BalancingPolicy.Node.INSTANCE, 1,
NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS,
Collections.<String> emptySet(), Collections.<String> emptySet(),
- sourceNodes, false);
+ sourceNodes, Balancer.Parameters.DEFAULT.blockpools, false);
conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 50);
final int r = Balancer.run(namenodes, p, conf);
@@ -1622,7 +1651,7 @@ public class TestBalancer {
BalancingPolicy.Node.INSTANCE, 1,
NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS,
Collections.<String> emptySet(), Collections.<String> emptySet(),
- sourceNodes, false);
+ sourceNodes, Balancer.Parameters.DEFAULT.blockpools, false);
conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1);
final int r = Balancer.run(namenodes, p, conf);
@@ -1639,7 +1668,7 @@ public class TestBalancer {
BalancingPolicy.Node.INSTANCE, 1,
NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS,
Collections.<String> emptySet(), Collections.<String> emptySet(),
- sourceNodes, false);
+ sourceNodes, Balancer.Parameters.DEFAULT.blockpools, false);
conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1);
final int r = Balancer.run(namenodes, p, conf);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d31a41c3/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java
index f51757c..b07ad89 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java
@@ -21,8 +21,13 @@ import java.io.IOException;
import java.net.URI;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Random;
+import java.util.Set;
import java.util.concurrent.TimeoutException;
import org.apache.commons.logging.Log;
@@ -42,6 +47,8 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
+import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.log4j.Level;
import org.junit.Assert;
import org.junit.Test;
@@ -60,6 +67,7 @@ public class TestBalancerWithMultipleNameNodes {
private static final long CAPACITY = 500L;
private static final String RACK0 = "/rack0";
private static final String RACK1 = "/rack1";
+ private static final String RACK2 = "/rack2";
private static final String FILE_NAME = "/tmp.txt";
private static final Path FILE_PATH = new Path(FILE_NAME);
@@ -76,16 +84,20 @@ public class TestBalancerWithMultipleNameNodes {
final MiniDFSCluster cluster;
final ClientProtocol[] clients;
final short replication;
-
+ final Balancer.Parameters parameters;
+
Suite(MiniDFSCluster cluster, final int nNameNodes, final int nDataNodes,
- Configuration conf) throws IOException {
+ Balancer.Parameters parameters, Configuration conf) throws IOException {
this.conf = conf;
this.cluster = cluster;
clients = new ClientProtocol[nNameNodes];
for(int i = 0; i < nNameNodes; i++) {
clients[i] = cluster.getNameNode(i).getRpcServer();
}
- replication = (short)Math.max(1, nDataNodes - 1);
+ // hard coding replication factor to 1 so logical and raw HDFS size are
+ // equal
+ replication = 1;
+ this.parameters = parameters;
}
}
@@ -104,11 +116,9 @@ public class TestBalancerWithMultipleNameNodes {
) throws IOException, InterruptedException, TimeoutException {
final ExtendedBlock[][] blocks = new ExtendedBlock[s.clients.length][];
for(int n = 0; n < s.clients.length; n++) {
- final long fileLen = size/s.replication;
- createFile(s, n, fileLen);
-
- final List<LocatedBlock> locatedBlocks = s.clients[n].getBlockLocations(
- FILE_NAME, 0, fileLen).getLocatedBlocks();
+ createFile(s, n, size);
+ final List<LocatedBlock> locatedBlocks =
+ s.clients[n].getBlockLocations(FILE_NAME, 0, size).getLocatedBlocks();
final int numOfBlocks = locatedBlocks.size();
blocks[n] = new ExtendedBlock[numOfBlocks];
@@ -151,9 +161,14 @@ public class TestBalancerWithMultipleNameNodes {
wait(s.clients, totalUsed, totalCapacity);
LOG.info("BALANCER 1");
+ // get storage reports for relevant blockpools so that we can compare
+ // blockpool usages after balancer has run
+ Map<Integer, DatanodeStorageReport[]> preBalancerPoolUsages =
+ getStorageReports(s);
+
// start rebalancing
final Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(s.conf);
- final int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, s.conf);
+ final int r = Balancer.run(namenodes, s.parameters, s.conf);
Assert.assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
LOG.info("BALANCER 2");
@@ -189,7 +204,7 @@ public class TestBalancerWithMultipleNameNodes {
balanced = true;
for(int d = 0; d < used.length; d++) {
final double p = used[d]*100.0/cap[d];
- balanced = p <= avg + Balancer.Parameters.DEFAULT.threshold;
+ balanced = p <= avg + s.parameters.threshold;
if (!balanced) {
if (i % 100 == 0) {
LOG.warn("datanodes " + d + " is not yet balanced: "
@@ -203,6 +218,89 @@ public class TestBalancerWithMultipleNameNodes {
}
}
LOG.info("BALANCER 6");
+ // cluster is balanced, verify that only selected blockpools were touched
+ Map<Integer, DatanodeStorageReport[]> postBalancerPoolUsages =
+ getStorageReports(s);
+ Assert.assertEquals(preBalancerPoolUsages.size(),
+ postBalancerPoolUsages.size());
+ for (Map.Entry<Integer, DatanodeStorageReport[]> entry
+ : preBalancerPoolUsages.entrySet()) {
+ compareTotalPoolUsage(entry.getValue(),
+ postBalancerPoolUsages.get(entry.getKey()));
+ }
+ }
+
+ /**
+ * Compare the total blockpool usage on each datanode to ensure that nothing
+ * was balanced.
+ *
+ * @param preReports storage reports from pre balancer run
+ * @param postReports storage reports from post balancer run
+ */
+ private static void compareTotalPoolUsage(DatanodeStorageReport[] preReports,
+ DatanodeStorageReport[] postReports) {
+ Assert.assertNotNull(preReports);
+ Assert.assertNotNull(postReports);
+ Assert.assertEquals(preReports.length, postReports.length);
+ for (DatanodeStorageReport preReport : preReports) {
+ String dnUuid = preReport.getDatanodeInfo().getDatanodeUuid();
+ for(DatanodeStorageReport postReport : postReports) {
+ if(postReport.getDatanodeInfo().getDatanodeUuid().equals(dnUuid)) {
+ Assert.assertEquals(getTotalPoolUsage(preReport),
+ getTotalPoolUsage(postReport));
+ LOG.info("Comparision of datanode pool usage pre/post balancer run. "
+ + "PrePoolUsage: " + getTotalPoolUsage(preReport)
+ + ", PostPoolUsage: " + getTotalPoolUsage(postReport));
+ break;
+ }
+ }
+ }
+ }
+
+ private static long getTotalPoolUsage(DatanodeStorageReport report) {
+ long usage = 0L;
+ for (StorageReport sr : report.getStorageReports()) {
+ usage += sr.getBlockPoolUsed();
+ }
+ return usage;
+ }
+
+ /**
+ * Get the storage reports for all blockpools that were not specified by the
+ * balancer blockpool parameters. If none were specified then the parameter
+ * was not set and do not return any reports.
+ *
+ * @param s suite for the test
+ * @return a map of storage reports where the key is the blockpool index
+ * @throws IOException
+ */
+ private static Map<Integer,
+ DatanodeStorageReport[]> getStorageReports(Suite s) throws IOException {
+ Map<Integer, DatanodeStorageReport[]> reports =
+ new HashMap<Integer, DatanodeStorageReport[]>();
+ if (s.parameters.blockpools.size() == 0) {
+ // the blockpools parameter was not set, so we don't need to track any
+ // blockpools.
+ return Collections.emptyMap();
+ }
+ for (int i = 0; i < s.clients.length; i++) {
+ if (s.parameters.blockpools.contains(s.cluster.getNamesystem(i)
+ .getBlockPoolId())) {
+ // we want to ensure that blockpools not specified by the balancer
+ // parameters were left alone. Therefore, if the pool was specified,
+ // skip it. Note: this code assumes the clients in the suite are ordered
+ // the same way that they are indexed via cluster#getNamesystem(index).
+ continue;
+ } else {
+ LOG.info("Tracking usage of blockpool id: "
+ + s.cluster.getNamesystem(i).getBlockPoolId());
+ reports.put(i,
+ s.clients[i].getDatanodeStorageReport(DatanodeReportType.LIVE));
+ }
+ }
+ LOG.info("Tracking " + reports.size()
+ + " blockpool(s) for pre/post balancer usage.");
+ return reports;
}
private static void sleep(long ms) {
@@ -220,25 +318,31 @@ public class TestBalancerWithMultipleNameNodes {
}
/**
- * First start a cluster and fill the cluster up to a certain size.
- * Then redistribute blocks according the required distribution.
- * Finally, balance the cluster.
- *
+ * First start a cluster and fill the cluster up to a certain size. Then
+ * redistribute blocks according the required distribution. Finally, balance
+ * the cluster.
+ *
* @param nNameNodes Number of NameNodes
- * @param distributionPerNN The distribution for each NameNode.
+ * @param nNameNodesToBalance Number of NameNodes to run the balancer on
+ * @param distributionPerNN The distribution for each NameNode.
* @param capacities Capacities of the datanodes
* @param racks Rack names
* @param conf Configuration
*/
private void unevenDistribution(final int nNameNodes,
- long distributionPerNN[], long capacities[], String[] racks,
- Configuration conf) throws Exception {
+ final int nNameNodesToBalance, long distributionPerNN[],
+ long capacities[], String[] racks, Configuration conf) throws Exception {
LOG.info("UNEVEN 0");
final int nDataNodes = distributionPerNN.length;
if (capacities.length != nDataNodes || racks.length != nDataNodes) {
throw new IllegalArgumentException("Array length is not the same");
}
+ if (nNameNodesToBalance > nNameNodes) {
+ throw new IllegalArgumentException("Number of namenodes to balance is "
+ + "greater than the number of namenodes.");
+ }
+
// calculate total space that need to be filled
final long usedSpacePerNN = TestBalancer.sum(distributionPerNN);
@@ -248,7 +352,7 @@ public class TestBalancerWithMultipleNameNodes {
LOG.info("UNEVEN 1");
final MiniDFSCluster cluster = new MiniDFSCluster
.Builder(new Configuration(conf))
- .nnTopology(MiniDFSNNTopology.simpleFederatedTopology(2))
+ .nnTopology(MiniDFSNNTopology.simpleFederatedTopology(nNameNodes))
.numDataNodes(nDataNodes)
.racks(racks)
.simulatedCapacities(capacities)
@@ -258,7 +362,7 @@ public class TestBalancerWithMultipleNameNodes {
cluster.waitActive();
DFSTestUtil.setFederatedConfiguration(cluster, conf);
LOG.info("UNEVEN 3");
- final Suite s = new Suite(cluster, nNameNodes, nDataNodes, conf);
+ final Suite s = new Suite(cluster, nNameNodes, nDataNodes, null, conf);
blocks = generateBlocks(s, usedSpacePerNN);
LOG.info("UNEVEN 4");
} finally {
@@ -280,7 +384,20 @@ public class TestBalancerWithMultipleNameNodes {
try {
cluster.waitActive();
LOG.info("UNEVEN 12");
- final Suite s = new Suite(cluster, nNameNodes, nDataNodes, conf);
+ Set<String> blockpools = new HashSet<String>();
+ for (int i = 0; i < nNameNodesToBalance; i++) {
+ blockpools.add(cluster.getNamesystem(i).getBlockPoolId());
+ }
+ Balancer.Parameters params =
+ new Balancer.Parameters(Balancer.Parameters.DEFAULT.policy,
+ Balancer.Parameters.DEFAULT.threshold,
+ Balancer.Parameters.DEFAULT.maxIdleIteration,
+ Balancer.Parameters.DEFAULT.excludedNodes,
+ Balancer.Parameters.DEFAULT.includedNodes,
+ Balancer.Parameters.DEFAULT.sourceNodes, blockpools,
+ Balancer.Parameters.DEFAULT.runDuringUpgrade);
+ final Suite s =
+ new Suite(cluster, nNameNodes, nDataNodes, params, conf);
for(int n = 0; n < nNameNodes; n++) {
// redistribute blocks
final Block[][] blocksDN = TestBalancer.distributeBlocks(
@@ -336,7 +453,9 @@ public class TestBalancerWithMultipleNameNodes {
try {
cluster.waitActive();
LOG.info("RUN_TEST 1");
- final Suite s = new Suite(cluster, nNameNodes, nDataNodes, conf);
+ final Suite s =
+ new Suite(cluster, nNameNodes, nDataNodes,
+ Balancer.Parameters.DEFAULT, conf);
long totalCapacity = TestBalancer.sum(capacities);
LOG.info("RUN_TEST 2");
@@ -378,10 +497,26 @@ public class TestBalancerWithMultipleNameNodes {
@Test
public void testUnevenDistribution() throws Exception {
final Configuration conf = createConf();
- unevenDistribution(2,
+ unevenDistribution(2, 2,
new long[] {30*CAPACITY/100, 5*CAPACITY/100},
new long[]{CAPACITY, CAPACITY},
new String[] {RACK0, RACK1},
conf);
}
+
+ @Test
+ public void testBalancing1OutOf2Blockpools() throws Exception {
+ final Configuration conf = createConf();
+ unevenDistribution(2, 1, new long[] { 30 * CAPACITY / 100,
+ 5 * CAPACITY / 100 }, new long[] { CAPACITY, CAPACITY }, new String[] {
+ RACK0, RACK1 }, conf);
+ }
+
+ @Test
+ public void testBalancing2OutOf3Blockpools() throws Exception {
+ final Configuration conf = createConf();
+ unevenDistribution(3, 2, new long[] { 30 * CAPACITY / 100,
+ 5 * CAPACITY / 100, 10 * CAPACITY / 100 }, new long[] { CAPACITY,
+ CAPACITY, CAPACITY }, new String[] { RACK0, RACK1, RACK2 }, conf);
+ }
}