You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@ozone.apache.org by GitBox <gi...@apache.org> on 2021/08/06 11:41:14 UTC

[GitHub] [ozone] lokeshj1703 commented on a change in pull request #2441: HDDS-4929. Select target datanodes and containers to move for Container Balancer

lokeshj1703 commented on a change in pull request #2441:
URL: https://github.com/apache/ozone/pull/2441#discussion_r684164492



##########
File path: hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancer.java
##########
@@ -131,28 +190,255 @@ public void unBalancedNodesListShouldBeEmptyWhenClusterIsBalanced() {
     balancerConfiguration.setThreshold(0.99);
     containerBalancer.start(balancerConfiguration);
 
-    Assert.assertEquals(0, containerBalancer.getUnBalancedNodes().size());
+    // waiting for balance completed.
+    // TODO: this is a temporary implementation for now
+    // modify this after balancer is fully completed
+    try {
+      Thread.sleep(100);
+    } catch (InterruptedException e) {}
+
     containerBalancer.stop();
+    Assert.assertEquals(0, containerBalancer.getUnBalancedNodes().size());
   }
 
   /**
-   * Checks whether ContainerBalancer stops when the limit of
-   * MaxDatanodesToBalance is reached.
+   * ContainerBalancer should not involve more datanodes than the
+   * MaxDatanodesToInvolvePerIteration limit.
    */
   @Test
-  public void containerBalancerShouldStopWhenMaxDatanodesToBalanceIsReached() {
-    balancerConfiguration.setMaxDatanodesToBalance(2);
+  public void containerBalancerShouldObeyMaxDatanodesToInvolveLimit() {
+    balancerConfiguration.setMaxDatanodesToInvolvePerIteration(0.3d);
+    balancerConfiguration.setMaxSizeToMovePerIteration(100 * OzoneConsts.GB);
+    balancerConfiguration.setThreshold(0.01);
+    balancerConfiguration.setIdleIteration(1);
+    containerBalancer.start(balancerConfiguration);
+
+    // waiting for balance completed.
+    // TODO: this is a temporary implementation for now
+    // modify this after balancer is fully completed
+    try {
+      Thread.sleep(1000);
+    } catch (InterruptedException e) {}
+
+    Assert.assertFalse(

Review comment:
       NIT: We could also add assertions for metrics here. Can be done in a separate PR.

##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
##########
@@ -224,86 +291,284 @@ private boolean initializeIteration() {
         withinThresholdUtilizedNodes.add(datanodeUsageInfo);
       }
     }
+    metrics.setDatanodesNumToBalance(new LongMetric(countDatanodesToBalance));
+    // TODO update dataSizeToBalanceGB metric with overLoadedBytes and
+    //  underLoadedBytes
     Collections.reverse(underUtilizedNodes);
 
-    long countDatanodesBalanced = 0;
-    // count number of nodes that were balanced in previous iteration
-    for (DatanodeUsageInfo node : unBalancedNodes) {
-      if (!containsNode(overUtilizedNodes, node) &&
-          !containsNode(underUtilizedNodes, node)) {
-        countDatanodesBalanced += 1;
-      }
-    }
-    // calculate total number of nodes that have been balanced so far
-    countDatanodesBalanced =
-        metrics.incrementDatanodesNumBalanced(countDatanodesBalanced);
-
     unBalancedNodes = new ArrayList<>(
         overUtilizedNodes.size() + underUtilizedNodes.size());
+    unBalancedNodes.addAll(overUtilizedNodes);
+    unBalancedNodes.addAll(underUtilizedNodes);
 
-    if (countDatanodesBalanced + countDatanodesToBalance >
-        maxDatanodesToBalance) {
-      LOG.info("Approaching Max Datanodes To Balance limit in Container " +
-          "Balancer. Stopping Balancer.");
+    if (unBalancedNodes.isEmpty()) {
+      LOG.info("Did not find any unbalanced Datanodes.");
       return false;
-    } else {
-      unBalancedNodes.addAll(overUtilizedNodes);
-      unBalancedNodes.addAll(underUtilizedNodes);
+    }
+
+    LOG.info("Container Balancer has identified {} Over-Utilized and {} " +
+            "Under-Utilized Datanodes that need to be balanced.",
+        overUtilizedNodes.size(), underUtilizedNodes.size());
+
+    selectionCriteria = new ContainerBalancerSelectionCriteria(config,
+        nodeManager, replicationManager, containerManager);
+    sourceToTargetMap = new HashMap<>(overUtilizedNodes.size() +
+        withinThresholdUtilizedNodes.size());
+
+    // initialize maps to track how much size is leaving and entering datanodes
+    sizeLeavingNode = new HashMap<>(overUtilizedNodes.size() +
+        withinThresholdUtilizedNodes.size());
+    overUtilizedNodes.forEach(datanodeUsageInfo -> sizeLeavingNode
+        .put(datanodeUsageInfo.getDatanodeDetails(), 0L));
+    withinThresholdUtilizedNodes.forEach(datanodeUsageInfo -> sizeLeavingNode
+        .put(datanodeUsageInfo.getDatanodeDetails(), 0L));
+
+    sizeEnteringNode = new HashMap<>(underUtilizedNodes.size() +
+        withinThresholdUtilizedNodes.size());
+    underUtilizedNodes.forEach(datanodeUsageInfo -> sizeEnteringNode
+        .put(datanodeUsageInfo.getDatanodeDetails(), 0L));
+    withinThresholdUtilizedNodes.forEach(datanodeUsageInfo -> sizeEnteringNode
+        .put(datanodeUsageInfo.getDatanodeDetails(), 0L));
 
-      //for now, we just sleep to simulate the execution of balancer
-      //this if for acceptance test now. modify this later when balancer
-      //if fully completed
+    return true;
+  }
+
+  private IterationResult doIteration() {
+    List<DatanodeDetails> potentialTargets = getPotentialTargets();
+    Set<DatanodeDetails> selectedTargets =
+        new HashSet<>(potentialTargets.size());
+    moveSelectionToFutureMap = new HashMap<>(unBalancedNodes.size());
+
+    // match each overUtilized node with a target
+    for (DatanodeUsageInfo datanodeUsageInfo : overUtilizedNodes) {
+      DatanodeDetails source = datanodeUsageInfo.getDatanodeDetails();
+      IterationResult result = checkConditionsForBalancing();
+      if (result != null) {
+        LOG.info("Conditions for balancing failed. Exiting current " +
+            "iteration...");

Review comment:
       NIT: This is not a failure. We can just print the enum to indicate end of iteration.

##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
##########
@@ -224,86 +291,284 @@ private boolean initializeIteration() {
         withinThresholdUtilizedNodes.add(datanodeUsageInfo);
       }
     }
+    metrics.setDatanodesNumToBalance(new LongMetric(countDatanodesToBalance));
+    // TODO update dataSizeToBalanceGB metric with overLoadedBytes and
+    //  underLoadedBytes
     Collections.reverse(underUtilizedNodes);
 
-    long countDatanodesBalanced = 0;
-    // count number of nodes that were balanced in previous iteration
-    for (DatanodeUsageInfo node : unBalancedNodes) {
-      if (!containsNode(overUtilizedNodes, node) &&
-          !containsNode(underUtilizedNodes, node)) {
-        countDatanodesBalanced += 1;
-      }
-    }
-    // calculate total number of nodes that have been balanced so far
-    countDatanodesBalanced =
-        metrics.incrementDatanodesNumBalanced(countDatanodesBalanced);
-
     unBalancedNodes = new ArrayList<>(
         overUtilizedNodes.size() + underUtilizedNodes.size());
+    unBalancedNodes.addAll(overUtilizedNodes);
+    unBalancedNodes.addAll(underUtilizedNodes);
 
-    if (countDatanodesBalanced + countDatanodesToBalance >
-        maxDatanodesToBalance) {
-      LOG.info("Approaching Max Datanodes To Balance limit in Container " +
-          "Balancer. Stopping Balancer.");
+    if (unBalancedNodes.isEmpty()) {
+      LOG.info("Did not find any unbalanced Datanodes.");
       return false;
-    } else {
-      unBalancedNodes.addAll(overUtilizedNodes);
-      unBalancedNodes.addAll(underUtilizedNodes);
+    }
+
+    LOG.info("Container Balancer has identified {} Over-Utilized and {} " +
+            "Under-Utilized Datanodes that need to be balanced.",
+        overUtilizedNodes.size(), underUtilizedNodes.size());
+
+    selectionCriteria = new ContainerBalancerSelectionCriteria(config,
+        nodeManager, replicationManager, containerManager);
+    sourceToTargetMap = new HashMap<>(overUtilizedNodes.size() +
+        withinThresholdUtilizedNodes.size());
+
+    // initialize maps to track how much size is leaving and entering datanodes
+    sizeLeavingNode = new HashMap<>(overUtilizedNodes.size() +
+        withinThresholdUtilizedNodes.size());
+    overUtilizedNodes.forEach(datanodeUsageInfo -> sizeLeavingNode
+        .put(datanodeUsageInfo.getDatanodeDetails(), 0L));
+    withinThresholdUtilizedNodes.forEach(datanodeUsageInfo -> sizeLeavingNode
+        .put(datanodeUsageInfo.getDatanodeDetails(), 0L));
+
+    sizeEnteringNode = new HashMap<>(underUtilizedNodes.size() +
+        withinThresholdUtilizedNodes.size());
+    underUtilizedNodes.forEach(datanodeUsageInfo -> sizeEnteringNode
+        .put(datanodeUsageInfo.getDatanodeDetails(), 0L));
+    withinThresholdUtilizedNodes.forEach(datanodeUsageInfo -> sizeEnteringNode
+        .put(datanodeUsageInfo.getDatanodeDetails(), 0L));
 
-      //for now, we just sleep to simulate the execution of balancer
-      //this if for acceptance test now. modify this later when balancer
-      //if fully completed
+    return true;
+  }
+
+  private IterationResult doIteration() {
+    List<DatanodeDetails> potentialTargets = getPotentialTargets();
+    Set<DatanodeDetails> selectedTargets =
+        new HashSet<>(potentialTargets.size());
+    moveSelectionToFutureMap = new HashMap<>(unBalancedNodes.size());
+
+    // match each overUtilized node with a target
+    for (DatanodeUsageInfo datanodeUsageInfo : overUtilizedNodes) {
+      DatanodeDetails source = datanodeUsageInfo.getDatanodeDetails();
+      IterationResult result = checkConditionsForBalancing();
+      if (result != null) {
+        LOG.info("Conditions for balancing failed. Exiting current " +
+            "iteration...");
+        return result;
+      }
+
+      ContainerMoveSelection moveSelection =
+          matchSourceWithTarget(source, potentialTargets);
+
+      if (moveSelection != null) {
+        LOG.info("ContainerBalancer is trying to move container {} from " +
+                "source datanode {} to target datanode {}",
+            moveSelection.getContainerID().toString(), source.getUuidString(),
+            moveSelection.getTargetNode().getUuidString());
+
+        if (moveContainer(source, moveSelection)) {
+          // consider move successful for now, and update selection criteria
+          potentialTargets = updateTargetsAndSelectionCriteria(potentialTargets,
+              selectedTargets, moveSelection, source);
+        }
+      }
+    }
+
+    // if not all underUtilized nodes have been selected, try to match
+    // withinThresholdUtilized nodes with underUtilized nodes
+    if (selectedTargets.size() < underUtilizedNodes.size()) {
+      potentialTargets.removeAll(selectedTargets);
+      Collections.reverse(withinThresholdUtilizedNodes);
+
+      for (DatanodeUsageInfo datanodeUsageInfo : withinThresholdUtilizedNodes) {
+        DatanodeDetails source = datanodeUsageInfo.getDatanodeDetails();
+        IterationResult result = checkConditionsForBalancing();
+        if (result != null) {
+          LOG.info("Conditions for balancing failed. Exiting current " +
+              "iteration...");
+          return result;
+        }
+
+        ContainerMoveSelection moveSelection =
+            matchSourceWithTarget(source, potentialTargets);
+
+        if (moveSelection != null) {
+          LOG.info("ContainerBalancer is trying to move container {} from " +
+                  "source datanode {} to target datanode {}",
+              moveSelection.getContainerID().toString(),
+              source.getUuidString(),
+              moveSelection.getTargetNode().getUuidString());
+
+          if (moveContainer(source, moveSelection)) {
+            // consider move successful for now, and update selection criteria
+            potentialTargets =
+                updateTargetsAndSelectionCriteria(potentialTargets,
+                    selectedTargets, moveSelection, source);
+          }
+        }
+      }
+    }
+
+    // check move results
+    for (Map.Entry<ContainerMoveSelection,
+            CompletableFuture<ReplicationManager.MoveResult>>
+        futureEntry : moveSelectionToFutureMap.entrySet()) {
+      ContainerMoveSelection moveSelection = futureEntry.getKey();
+      CompletableFuture<ReplicationManager.MoveResult> future =
+          futureEntry.getValue();
       try {
-        Thread.sleep(50);
+        ReplicationManager.MoveResult result = future.get(
+            config.getMoveTimeout().toMillis(), TimeUnit.MILLISECONDS);
+        if (result == ReplicationManager.MoveResult.COMPLETED) {
+          metrics.incrementMovedContainersNum(1);
+          //TODO update metrics with size moved in this iteration
+        }
       } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
+        LOG.warn("Container move for container {} was interrupted.",
+            moveSelection.getContainerID(), e);
+      } catch (ExecutionException e) {
+        LOG.warn("Container move for container {} completed exceptionally.",
+            moveSelection.getContainerID(), e);
+      } catch (TimeoutException e) {
+        LOG.warn("Container move for container {} timed out.",
+            moveSelection.getContainerID(), e);
       }
-      /////////////////////////////
+    }
+    return IterationResult.ITERATION_COMPLETED;
+  }
 
-      if (unBalancedNodes.isEmpty()) {
-        LOG.info("Did not find any unbalanced Datanodes.");
-        return false;
-      } else {
-        LOG.info("Container Balancer has identified Datanodes that need to be" +
-            " balanced.");
-      }
+  /**
+   * Match a source datanode with a target datanode and identify the container
+   * to move.
+   *
+   * @param potentialTargets Collection of potential targets to move
+   *                         container to
+   * @return ContainerMoveSelection containing the selected target and container
+   */
+  private ContainerMoveSelection matchSourceWithTarget(

Review comment:
       NIT: There are a few info logs in matchSourceWithTarget and checkConditionsForBalancing which can be converted to debug.

##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerConfiguration.java
##########
@@ -44,23 +54,70 @@
           " of the entire cluster) no more than the threshold value.")
   private String threshold = "0.1";
 
-  @Config(key = "datanodes.balanced.max", type = ConfigType.INT,
-      defaultValue = "5", tags = {ConfigTag.BALANCER}, description = "The " +
-      "maximum number of datanodes that should be balanced. Container " +
-      "Balancer will not balance more number of datanodes than this limit.")
-  private int maxDatanodesToBalance = 5;
+  @Config(key = "datanodes.involved.max.per.iteration", type = ConfigType.AUTO,

Review comment:
       NIT: rename `max` to `ratio` or sth better.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org