You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2020/12/02 23:53:33 UTC
[hadoop] branch trunk updated: HDFS-14904. Add Option to let
Balancer prefer highly utilized nodes in each iteration (#2483).
Contributed by Leon Gao.
This is an automated email from the ASF dual-hosted git repository.
jing9 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6ff2409 HDFS-14904. Add Option to let Balancer prefer highly utilized nodes in each iteration (#2483). Contributed by Leon Gao.
6ff2409 is described below
commit 6ff2409b31766cee71cedc30608f4a22e06d31e7
Author: LeonGao <li...@uber.com>
AuthorDate: Wed Dec 2 15:53:09 2020 -0800
HDFS-14904. Add Option to let Balancer prefer highly utilized nodes in each iteration (#2483). Contributed by Leon Gao.
---
.../hadoop/hdfs/server/balancer/Balancer.java | 37 +++++++-
.../hdfs/server/balancer/BalancerParameters.java | 18 +++-
.../hadoop/hdfs/server/balancer/TestBalancer.java | 102 +++++++++++++++++++++
3 files changed, 153 insertions(+), 4 deletions(-)
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
index 33b5fa4..e5f9e8c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
@@ -29,10 +29,12 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@@ -199,7 +201,10 @@ public class Balancer {
+ "\tWhether to run the balancer during an ongoing HDFS upgrade."
+ "This is usually not desired since it will not affect used space "
+ "on over-utilized machines."
- + "\n\t[-asService]\tRun as a long running service.";
+ + "\n\t[-asService]\tRun as a long running service."
+ + "\n\t[-sortTopNodes]"
+ + "\tSort datanodes based on the utilization so "
+ + "that highly utilized datanodes get scheduled first.";
@VisibleForTesting
private static volatile boolean serviceRunning = false;
@@ -215,6 +220,7 @@ public class Balancer {
private final double threshold;
private final long maxSizeToMove;
private final long defaultBlockSize;
+ private final boolean sortTopNodes;
// all data node lists
private final Collection<Source> overUtilized = new LinkedList<Source>();
@@ -328,6 +334,7 @@ public class Balancer {
this.policy = p.getBalancingPolicy();
this.sourceNodes = p.getSourceNodes();
this.runDuringUpgrade = p.getRunDuringUpgrade();
+ this.sortTopNodes = p.getSortTopNodes();
this.maxSizeToMove = getLongBytes(conf,
DFSConfigKeys.DFS_BALANCER_MAX_SIZE_TO_MOVE_KEY,
@@ -374,6 +381,8 @@ public class Balancer {
policy.accumulateSpaces(r);
}
policy.initAvgUtilization();
+ // Store the capacity % of over utilized nodes for sorting, if needed.
+ Map<Source, Double> overUtilizedPercentage = new HashMap<>();
// create network topology and classify utilization collections:
// over-utilized, above-average, below-average and under-utilized.
@@ -383,7 +392,7 @@ public class Balancer {
final boolean isSource = Util.isIncluded(sourceNodes, dn.getDatanodeInfo());
for(StorageType t : StorageType.getMovableTypes()) {
final Double utilization = policy.getUtilization(r, t);
- if (utilization == null) { // datanode does not have such storage type
+ if (utilization == null) { // datanode does not have such storage type
continue;
}
@@ -409,6 +418,7 @@ public class Balancer {
} else {
overLoadedBytes += percentage2bytes(thresholdDiff, capacity);
overUtilized.add(s);
+ overUtilizedPercentage.put(s, utilization);
}
g = s;
} else {
@@ -424,6 +434,10 @@ public class Balancer {
}
}
+ if (sortTopNodes) {
+ sortOverUtilized(overUtilizedPercentage);
+ }
+
logUtilizationCollections();
Preconditions.checkState(dispatcher.getStorageGroupMap().size()
@@ -435,6 +449,21 @@ public class Balancer {
return Math.max(overLoadedBytes, underLoadedBytes);
}
+ private void sortOverUtilized(Map<Source, Double> overUtilizedPercentage) {
+ Preconditions.checkState(overUtilized instanceof List,
+ "Collection overUtilized is not a List.");
+
+ LOG.info("Sorting over-utilized nodes by capacity" +
+ " to bring down top used datanode capacity faster");
+
+ List<Source> list = (List<Source>) overUtilized;
+ list.sort(
+ (Source source1, Source source2) ->
+ (Double.compare(overUtilizedPercentage.get(source2),
+ overUtilizedPercentage.get(source1)))
+ );
+ }
+
private static long computeMaxSize2Move(final long capacity, final long remaining,
final double utilizationDiff, final long max) {
final double diff = Math.abs(utilizationDiff);
@@ -961,6 +990,10 @@ public class Balancer {
} else if ("-asService".equalsIgnoreCase(args[i])) {
b.setRunAsService(true);
LOG.info("Balancer will run as a long running service");
+ } else if ("-sortTopNodes".equalsIgnoreCase(args[i])) {
+ b.setSortTopNodes(true);
+ LOG.info("Balancer will sort nodes by" +
+ " capacity usage percentage to prioritize top used nodes");
} else {
throw new IllegalArgumentException("args = "
+ Arrays.toString(args));
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancerParameters.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancerParameters.java
index cdca39f..e614327 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancerParameters.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancerParameters.java
@@ -47,6 +47,8 @@ final class BalancerParameters {
private final boolean runAsService;
+ private final boolean sortTopNodes;
+
static final BalancerParameters DEFAULT = new BalancerParameters();
private BalancerParameters() {
@@ -63,6 +65,7 @@ final class BalancerParameters {
this.blockpools = builder.blockpools;
this.runDuringUpgrade = builder.runDuringUpgrade;
this.runAsService = builder.runAsService;
+ this.sortTopNodes = builder.sortTopNodes;
}
BalancingPolicy getBalancingPolicy() {
@@ -101,16 +104,21 @@ final class BalancerParameters {
return this.runAsService;
}
+ boolean getSortTopNodes() {
+ return this.sortTopNodes;
+ }
+
@Override
public String toString() {
return String.format("%s.%s [%s," + " threshold = %s,"
+ " max idle iteration = %s," + " #excluded nodes = %s,"
+ " #included nodes = %s," + " #source nodes = %s,"
- + " #blockpools = %s," + " run during upgrade = %s]",
+ + " #blockpools = %s," + " run during upgrade = %s]"
+ + " sort top nodes = %s",
Balancer.class.getSimpleName(), getClass().getSimpleName(), policy,
threshold, maxIdleIteration, excludedNodes.size(),
includedNodes.size(), sourceNodes.size(), blockpools.size(),
- runDuringUpgrade);
+ runDuringUpgrade, sortTopNodes);
}
static class Builder {
@@ -125,6 +133,7 @@ final class BalancerParameters {
private Set<String> blockpools = Collections.<String> emptySet();
private boolean runDuringUpgrade = false;
private boolean runAsService = false;
+ private boolean sortTopNodes = false;
Builder() {
}
@@ -174,6 +183,11 @@ final class BalancerParameters {
return this;
}
+ Builder setSortTopNodes(boolean shouldSortTopNodes) {
+ this.sortTopNodes = shouldSortTopNodes;
+ return this;
+ }
+
BalancerParameters build() {
return new BalancerParameters(this);
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
index a0f95f7..82d710d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.balancer;
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY;
import static org.apache.hadoop.fs.StorageType.DEFAULT;
import static org.apache.hadoop.fs.StorageType.RAM_DISK;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BALANCER_MAX_SIZE_TO_MOVE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_HTTPS_KEYSTORE_RESOURCE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
@@ -46,6 +47,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBER
import static org.apache.hadoop.test.PlatformAssumptions.assumeNotWindows;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.junit.AfterClass;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -2209,6 +2211,106 @@ public class TestBalancer {
getBlocksMaxQps, getBlockCallsPerSecond <= getBlocksMaxQps);
}
+ @Test(timeout = 60000)
+ public void testBalancerWithSortTopNodes() throws Exception {
+ final Configuration conf = new HdfsConfiguration();
+ initConf(conf);
+ conf.setInt(DFS_HEARTBEAT_INTERVAL_KEY, 30000);
+
+ final long capacity = 1000L;
+ final int diffBetweenNodes = 50;
+
+ // Set up the datanodes with two groups:
+ // 5 over-utilized nodes with 80%, 85%, 90%, 95%, 100% usage
+ // 2 under-utilizaed nodes with 0%, 5% usage
+ // With sortTopNodes option, 100% and 95% used ones will be chosen.
+ final int numOfOverUtilizedDn = 5;
+ final int numOfUnderUtilizedDn = 2;
+ final int totalNumOfDn = numOfOverUtilizedDn + numOfUnderUtilizedDn;
+ final long[] capacityArray = new long[totalNumOfDn];
+ Arrays.fill(capacityArray, capacity);
+
+ cluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(totalNumOfDn)
+ .simulatedCapacities(capacityArray)
+ .build();
+
+ cluster.setDataNodesDead();
+
+ List<DataNode> dataNodes = cluster.getDataNodes();
+
+ // Create top used nodes
+ for (int i = 0; i < numOfOverUtilizedDn; i++) {
+ // Bring one node alive
+ DataNodeTestUtils.triggerHeartbeat(dataNodes.get(i));
+ DataNodeTestUtils.triggerBlockReport(dataNodes.get(i));
+ // Create nodes with: 80%, 85%, 90%, 95%, 100%.
+ int capacityForThisDatanode = (int)capacity
+ - diffBetweenNodes * (numOfOverUtilizedDn - i - 1);
+ createFile(cluster, new Path("test_big" + i),
+ capacityForThisDatanode, (short) 1, 0);
+ cluster.setDataNodesDead();
+ }
+
+ // Create under utilized nodes
+ for (int i = numOfUnderUtilizedDn - 1; i >= 0; i--) {
+ int index = i + numOfOverUtilizedDn;
+ // Bring one node alive
+ DataNodeTestUtils.triggerHeartbeat(dataNodes.get(index));
+ DataNodeTestUtils.triggerBlockReport(dataNodes.get(index));
+ // Create nodes with: 5%, 0%
+ int capacityForThisDatanode = diffBetweenNodes * i;
+ createFile(cluster,
+ new Path("test_small" + i),
+ capacityForThisDatanode, (short) 1, 0);
+ cluster.setDataNodesDead();
+ }
+
+ // Bring all nodes alive
+ cluster.triggerHeartbeats();
+ cluster.triggerBlockReports();
+ cluster.waitFirstBRCompleted(0, 6000);
+
+ final BalancerParameters p = Balancer.Cli.parse(new String[] {
+ "-policy", BalancingPolicy.Node.INSTANCE.getName(),
+ "-threshold", "1",
+ "-sortTopNodes"
+ });
+
+ client = NameNodeProxies.createProxy(conf,
+ cluster.getFileSystem(0).getUri(),
+ ClientProtocol.class).getProxy();
+
+ // Set max-size-to-move to small number
+ // so only top two nodes will be chosen in one iteration.
+ conf.setLong(DFS_BALANCER_MAX_SIZE_TO_MOVE_KEY, 99L);
+
+ final Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
+
+ List<NameNodeConnector> connectors = NameNodeConnector
+ .newNameNodeConnectors(namenodes,
+ Balancer.class.getSimpleName(), Balancer.BALANCER_ID_PATH, conf,
+ BalancerParameters.DEFAULT.getMaxIdleIteration());
+ final Balancer b = new Balancer(connectors.get(0), p, conf);
+ Result balancerResult = b.runOneIteration();
+
+ cluster.triggerDeletionReports();
+ cluster.triggerBlockReports();
+ cluster.triggerHeartbeats();
+
+ DatanodeInfo[] datanodeReport = client
+ .getDatanodeReport(DatanodeReportType.ALL);
+
+ long maxUsage = 0;
+ for (int i = 0; i < totalNumOfDn; i++) {
+ maxUsage = Math.max(maxUsage, datanodeReport[i].getDfsUsed());
+ }
+
+ assertEquals(200, balancerResult.bytesAlreadyMoved);
+ // 100% and 95% used nodes will be balanced, so top used will be 900
+ assertEquals(900, maxUsage);
+ }
+
/**
* @param args
*/
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org