You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2020/12/02 23:53:33 UTC

[hadoop] branch trunk updated: HDFS-14904. Add Option to let Balancer prefer highly utilized nodes in each iteration (#2483). Contributed by Leon Gao.

This is an automated email from the ASF dual-hosted git repository.

jing9 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6ff2409  HDFS-14904. Add Option to let Balancer prefer highly utilized nodes in each iteration (#2483). Contributed by Leon Gao.
6ff2409 is described below

commit 6ff2409b31766cee71cedc30608f4a22e06d31e7
Author: LeonGao <li...@uber.com>
AuthorDate: Wed Dec 2 15:53:09 2020 -0800

    HDFS-14904. Add Option to let Balancer prefer highly utilized nodes in each iteration (#2483). Contributed by Leon Gao.
---
 .../hadoop/hdfs/server/balancer/Balancer.java      |  37 +++++++-
 .../hdfs/server/balancer/BalancerParameters.java   |  18 +++-
 .../hadoop/hdfs/server/balancer/TestBalancer.java  | 102 +++++++++++++++++++++
 3 files changed, 153 insertions(+), 4 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
index 33b5fa4..e5f9e8c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
@@ -29,10 +29,12 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Date;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
@@ -199,7 +201,10 @@ public class Balancer {
       + "\tWhether to run the balancer during an ongoing HDFS upgrade."
       + "This is usually not desired since it will not affect used space "
       + "on over-utilized machines."
-      + "\n\t[-asService]\tRun as a long running service.";
+      + "\n\t[-asService]\tRun as a long running service."
+      + "\n\t[-sortTopNodes]"
+      + "\tSort datanodes based on the utilization so "
+      + "that highly utilized datanodes get scheduled first.";
 
   @VisibleForTesting
   private static volatile boolean serviceRunning = false;
@@ -215,6 +220,7 @@ public class Balancer {
   private final double threshold;
   private final long maxSizeToMove;
   private final long defaultBlockSize;
+  private final boolean sortTopNodes;
 
   // all data node lists
   private final Collection<Source> overUtilized = new LinkedList<Source>();
@@ -328,6 +334,7 @@ public class Balancer {
     this.policy = p.getBalancingPolicy();
     this.sourceNodes = p.getSourceNodes();
     this.runDuringUpgrade = p.getRunDuringUpgrade();
+    this.sortTopNodes = p.getSortTopNodes();
 
     this.maxSizeToMove = getLongBytes(conf,
         DFSConfigKeys.DFS_BALANCER_MAX_SIZE_TO_MOVE_KEY,
@@ -374,6 +381,8 @@ public class Balancer {
       policy.accumulateSpaces(r);
     }
     policy.initAvgUtilization();
+    // Store the capacity % of over utilized nodes for sorting, if needed.
+    Map<Source, Double> overUtilizedPercentage = new HashMap<>();
 
     // create network topology and classify utilization collections: 
     //   over-utilized, above-average, below-average and under-utilized.
@@ -383,7 +392,7 @@ public class Balancer {
       final boolean isSource = Util.isIncluded(sourceNodes, dn.getDatanodeInfo());
       for(StorageType t : StorageType.getMovableTypes()) {
         final Double utilization = policy.getUtilization(r, t);
-        if (utilization == null) { // datanode does not have such storage type 
+        if (utilization == null) { // datanode does not have such storage type
           continue;
         }
         
@@ -409,6 +418,7 @@ public class Balancer {
           } else {
             overLoadedBytes += percentage2bytes(thresholdDiff, capacity);
             overUtilized.add(s);
+            overUtilizedPercentage.put(s, utilization);
           }
           g = s;
         } else {
@@ -424,6 +434,10 @@ public class Balancer {
       }
     }
 
+    if (sortTopNodes) {
+      sortOverUtilized(overUtilizedPercentage);
+    }
+
     logUtilizationCollections();
     
     Preconditions.checkState(dispatcher.getStorageGroupMap().size()
@@ -435,6 +449,21 @@ public class Balancer {
     return Math.max(overLoadedBytes, underLoadedBytes);
   }
 
+  private void sortOverUtilized(Map<Source, Double> overUtilizedPercentage) {
+    Preconditions.checkState(overUtilized instanceof List,
+        "Collection overUtilized is not a List.");
+
+    LOG.info("Sorting over-utilized nodes by capacity" +
+        " to bring down top used datanode capacity faster");
+
+    List<Source> list = (List<Source>) overUtilized;
+    list.sort(
+        (Source source1, Source source2) ->
+            (Double.compare(overUtilizedPercentage.get(source2),
+                overUtilizedPercentage.get(source1)))
+    );
+  }
+
   private static long computeMaxSize2Move(final long capacity, final long remaining,
       final double utilizationDiff, final long max) {
     final double diff = Math.abs(utilizationDiff);
@@ -961,6 +990,10 @@ public class Balancer {
             } else if ("-asService".equalsIgnoreCase(args[i])) {
               b.setRunAsService(true);
               LOG.info("Balancer will run as a long running service");
+            } else if ("-sortTopNodes".equalsIgnoreCase(args[i])) {
+              b.setSortTopNodes(true);
+              LOG.info("Balancer will sort nodes by" +
+                  " capacity usage percentage to prioritize top used nodes");
             } else {
               throw new IllegalArgumentException("args = "
                   + Arrays.toString(args));
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancerParameters.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancerParameters.java
index cdca39f..e614327 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancerParameters.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancerParameters.java
@@ -47,6 +47,8 @@ final class BalancerParameters {
 
   private final boolean runAsService;
 
+  private final boolean sortTopNodes;
+
   static final BalancerParameters DEFAULT = new BalancerParameters();
 
   private BalancerParameters() {
@@ -63,6 +65,7 @@ final class BalancerParameters {
     this.blockpools = builder.blockpools;
     this.runDuringUpgrade = builder.runDuringUpgrade;
     this.runAsService = builder.runAsService;
+    this.sortTopNodes = builder.sortTopNodes;
   }
 
   BalancingPolicy getBalancingPolicy() {
@@ -101,16 +104,21 @@ final class BalancerParameters {
     return this.runAsService;
   }
 
+  boolean getSortTopNodes() {
+    return this.sortTopNodes;
+  }
+
   @Override
   public String toString() {
     return String.format("%s.%s [%s," + " threshold = %s,"
         + " max idle iteration = %s," + " #excluded nodes = %s,"
         + " #included nodes = %s," + " #source nodes = %s,"
-        + " #blockpools = %s," + " run during upgrade = %s]",
+        + " #blockpools = %s," + " run during upgrade = %s]"
+        + " sort top nodes = %s",
         Balancer.class.getSimpleName(), getClass().getSimpleName(), policy,
         threshold, maxIdleIteration, excludedNodes.size(),
         includedNodes.size(), sourceNodes.size(), blockpools.size(),
-        runDuringUpgrade);
+        runDuringUpgrade, sortTopNodes);
   }
 
   static class Builder {
@@ -125,6 +133,7 @@ final class BalancerParameters {
     private Set<String> blockpools = Collections.<String> emptySet();
     private boolean runDuringUpgrade = false;
     private boolean runAsService = false;
+    private boolean sortTopNodes = false;
 
     Builder() {
     }
@@ -174,6 +183,11 @@ final class BalancerParameters {
       return this;
     }
 
+    Builder setSortTopNodes(boolean shouldSortTopNodes) {
+      this.sortTopNodes = shouldSortTopNodes;
+      return this;
+    }
+
     BalancerParameters build() {
       return new BalancerParameters(this);
     }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
index a0f95f7..82d710d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.balancer;
 import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY;
 import static org.apache.hadoop.fs.StorageType.DEFAULT;
 import static org.apache.hadoop.fs.StorageType.RAM_DISK;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BALANCER_MAX_SIZE_TO_MOVE_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_HTTPS_KEYSTORE_RESOURCE_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_KEY;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
@@ -46,6 +47,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBER
 import static org.apache.hadoop.test.PlatformAssumptions.assumeNotWindows;
 
 import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.junit.AfterClass;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -2209,6 +2211,106 @@ public class TestBalancer {
         getBlocksMaxQps, getBlockCallsPerSecond <= getBlocksMaxQps);
   }
 
+  @Test(timeout = 60000)
+  public void testBalancerWithSortTopNodes() throws Exception {
+    final Configuration conf = new HdfsConfiguration();
+    initConf(conf);
+    conf.setInt(DFS_HEARTBEAT_INTERVAL_KEY, 30000);
+
+    final long capacity = 1000L;
+    final int diffBetweenNodes = 50;
+
+    // Set up the datanodes with two groups:
+    // 5 over-utilized nodes with 80%, 85%, 90%, 95%, 100% usage
+    // 2 under-utilizaed nodes with 0%, 5% usage
+    // With sortTopNodes option, 100% and 95% used ones will be chosen.
+    final int numOfOverUtilizedDn = 5;
+    final int numOfUnderUtilizedDn = 2;
+    final int totalNumOfDn = numOfOverUtilizedDn + numOfUnderUtilizedDn;
+    final long[] capacityArray = new long[totalNumOfDn];
+    Arrays.fill(capacityArray, capacity);
+
+    cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(totalNumOfDn)
+        .simulatedCapacities(capacityArray)
+        .build();
+
+    cluster.setDataNodesDead();
+
+    List<DataNode> dataNodes = cluster.getDataNodes();
+
+    // Create top used nodes
+    for (int i = 0; i < numOfOverUtilizedDn; i++) {
+      // Bring one node alive
+      DataNodeTestUtils.triggerHeartbeat(dataNodes.get(i));
+      DataNodeTestUtils.triggerBlockReport(dataNodes.get(i));
+      // Create nodes with: 80%, 85%, 90%, 95%, 100%.
+      int capacityForThisDatanode = (int)capacity
+          - diffBetweenNodes * (numOfOverUtilizedDn - i - 1);
+      createFile(cluster, new Path("test_big" + i),
+          capacityForThisDatanode, (short) 1, 0);
+      cluster.setDataNodesDead();
+    }
+
+    // Create under utilized nodes
+    for (int i = numOfUnderUtilizedDn - 1; i >= 0; i--) {
+      int index = i + numOfOverUtilizedDn;
+      // Bring one node alive
+      DataNodeTestUtils.triggerHeartbeat(dataNodes.get(index));
+      DataNodeTestUtils.triggerBlockReport(dataNodes.get(index));
+      // Create nodes with: 5%, 0%
+      int capacityForThisDatanode = diffBetweenNodes * i;
+      createFile(cluster,
+          new Path("test_small" + i),
+          capacityForThisDatanode, (short) 1, 0);
+      cluster.setDataNodesDead();
+    }
+
+    // Bring all nodes alive
+    cluster.triggerHeartbeats();
+    cluster.triggerBlockReports();
+    cluster.waitFirstBRCompleted(0, 6000);
+
+    final BalancerParameters p = Balancer.Cli.parse(new String[] {
+        "-policy", BalancingPolicy.Node.INSTANCE.getName(),
+        "-threshold", "1",
+        "-sortTopNodes"
+    });
+
+    client = NameNodeProxies.createProxy(conf,
+        cluster.getFileSystem(0).getUri(),
+        ClientProtocol.class).getProxy();
+
+    // Set max-size-to-move to small number
+    // so only top two nodes will be chosen in one iteration.
+    conf.setLong(DFS_BALANCER_MAX_SIZE_TO_MOVE_KEY, 99L);
+
+    final Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
+
+    List<NameNodeConnector> connectors = NameNodeConnector
+        .newNameNodeConnectors(namenodes,
+            Balancer.class.getSimpleName(), Balancer.BALANCER_ID_PATH, conf,
+            BalancerParameters.DEFAULT.getMaxIdleIteration());
+    final Balancer b = new Balancer(connectors.get(0), p, conf);
+    Result balancerResult = b.runOneIteration();
+
+    cluster.triggerDeletionReports();
+    cluster.triggerBlockReports();
+    cluster.triggerHeartbeats();
+
+    DatanodeInfo[] datanodeReport = client
+        .getDatanodeReport(DatanodeReportType.ALL);
+
+    long maxUsage = 0;
+    for (int i = 0; i < totalNumOfDn; i++) {
+      maxUsage = Math.max(maxUsage, datanodeReport[i].getDfsUsed());
+    }
+
+    assertEquals(200, balancerResult.bytesAlreadyMoved);
+    // 100% and 95% used nodes will be balanced, so top used will be 900
+    assertEquals(900, maxUsage);
+  }
+
   /**
    * @param args
    */


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org