You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@ozone.apache.org by GitBox <gi...@apache.org> on 2022/09/27 05:55:38 UTC

[GitHub] [ozone] sumitagrawl opened a new pull request, #3782: HDDS-7214 Continuous start & stop can have hanging threads in stopping

sumitagrawl opened a new pull request, #3782:
URL: https://github.com/apache/ozone/pull/3782

   ## What changes were proposed in this pull request?
   
   This includes changes related to issue for:
   1. Start, Stop, Start sequence where can have stopping thread still running
   2. Refactor of ContainerBalancer to 2 class, ContainerBalancer and ContainerBalancerTask
      - This refactor solve problem of changing member variables when start & stop
      - un-necessary locking which is present, as required only for external interaction
   
   ## What is the link to the Apache JIRA
   
   https://issues.apache.org/jira/browse/HDDS-7214
   https://issues.apache.org/jira/browse/HDDS-7215
   
   ## How was this patch tested?
   
   Patch is tested using Unit Test cases, covering extra scenarios.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] sumitagrawl commented on a diff in pull request #3782: HDDS-7214. Continuous start & stop can have hanging threads in stopping

Posted by GitBox <gi...@apache.org>.
sumitagrawl commented on code in PR #3782:
URL: https://github.com/apache/ozone/pull/3782#discussion_r990390247


##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java:
##########
@@ -1072,15 +295,15 @@ private void validateState(boolean expectedRunning)
       LOG.warn("SCM is in safe mode");
       throw new IllegalContainerBalancerStateException("SCM is in safe mode");
     }
-    lock.lock();
-    try {
-      if (isBalancerRunning() != expectedRunning) {
-        throw new IllegalContainerBalancerStateException(
-            "Expect ContainerBalancer running state to be " + expectedRunning +
-                ", but running state is actually " + isBalancerRunning());
-      }
-    } finally {
-      lock.unlock();
+    if (!expectedRunning && !canBalancerStart()) {
+      throw new IllegalContainerBalancerStateException(
+          "Expect ContainerBalancer as not running state" +
+              ", but running state is actually " + getBalancerStatus());
+    }
+    if (expectedRunning && !canBalancerStop()) {
+      throw new IllegalContainerBalancerStateException(
+          "Expect ContainerBalancer as running state" +
+              ", but running state is actually " + getBalancerStatus());

Review Comment:
   This is required, as per state machine
   Start should be triggered --> when nothing present or Stopped. But not allow when stopping. So canStart() do this job
   
   Stop should be triggered --> when its running. Else no need call stop and throw exception for case of stopping or nothing present.
   
   IsBalancerRunning just check for runningState, but not considering case of stopping.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] sumitagrawl commented on a diff in pull request #3782: HDDS-7214. Continuous start & stop can have hanging threads in stopping

Posted by GitBox <gi...@apache.org>.
sumitagrawl commented on code in PR #3782:
URL: https://github.com/apache/ozone/pull/3782#discussion_r988740394


##########
hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancer.java:
##########
@@ -212,559 +206,102 @@ public void setup() throws IOException, NodeNotFoundException,
   }
 
   @Test
-  public void testCalculationOfUtilization() {
-    Assertions.assertEquals(nodesInCluster.size(), nodeUtilizations.size());
-    for (int i = 0; i < nodesInCluster.size(); i++) {
-      Assertions.assertEquals(nodeUtilizations.get(i),
-          nodesInCluster.get(i).calculateUtilization(), 0.0001);
-    }
-
+  public void testShouldRun() throws Exception {
+    boolean doRun = containerBalancer.shouldRun();
     // should be equal to average utilization of the cluster
-    Assertions.assertEquals(averageUtilization,
-        containerBalancer.calculateAvgUtilization(nodesInCluster), 0.0001);
-  }
-
-  /**
-   * Checks whether ContainerBalancer is correctly updating the list of
-   * unBalanced nodes with varying values of Threshold.
-   */
-  @Test
-  public void
-      initializeIterationShouldUpdateUnBalancedNodesWhenThresholdChanges()
-      throws IllegalContainerBalancerStateException, IOException,
-      InvalidContainerBalancerConfigurationException, TimeoutException {
-    List<DatanodeUsageInfo> expectedUnBalancedNodes;
-    List<DatanodeUsageInfo> unBalancedNodesAccordingToBalancer;
-
-    // check for random threshold values
-    for (int i = 0; i < 50; i++) {
-      double randomThreshold = RANDOM.nextDouble() * 100;
-
-      balancerConfiguration.setThreshold(randomThreshold);
-      startBalancer(balancerConfiguration);
-
-      // waiting for balance completed.
-      // TODO: this is a temporary implementation for now
-      // modify this after balancer is fully completed
-      try {
-        Thread.sleep(100);
-      } catch (InterruptedException e) { }
-
-      expectedUnBalancedNodes =
-          determineExpectedUnBalancedNodes(randomThreshold);
-      unBalancedNodesAccordingToBalancer =
-          containerBalancer.getUnBalancedNodes();
-
-      stopBalancer();
-      Assertions.assertEquals(
-          expectedUnBalancedNodes.size(),
-          unBalancedNodesAccordingToBalancer.size());
-
-      for (int j = 0; j < expectedUnBalancedNodes.size(); j++) {
-        Assertions.assertEquals(
-            expectedUnBalancedNodes.get(j).getDatanodeDetails(),
-            unBalancedNodesAccordingToBalancer.get(j).getDatanodeDetails());
-      }
-    }
+    Assertions.assertEquals(doRun, false);
+    containerBalancer.saveConfiguration(balancerConfiguration, true, 0);
+    doRun = containerBalancer.shouldRun();
+    Assertions.assertEquals(doRun, true);
+    containerBalancer.saveConfiguration(balancerConfiguration, false, 0);
+    doRun = containerBalancer.shouldRun();
+    Assertions.assertEquals(doRun, false);
   }
 
-  /**
-   * Checks whether the list of unBalanced nodes is empty when the cluster is
-   * balanced.
-   */
   @Test
-  public void unBalancedNodesListShouldBeEmptyWhenClusterIsBalanced()
-      throws IllegalContainerBalancerStateException, IOException,
-      InvalidContainerBalancerConfigurationException, TimeoutException {
-    balancerConfiguration.setThreshold(99.99);
-    startBalancer(balancerConfiguration);
-
-    sleepWhileBalancing(100);
-
-    stopBalancer();
-    ContainerBalancerMetrics metrics = containerBalancer.getMetrics();
-    Assertions.assertEquals(0, containerBalancer.getUnBalancedNodes().size());
-    Assertions.assertEquals(0, metrics.getNumDatanodesUnbalanced());
-  }
-
-  /**
-   * ContainerBalancer should not involve more datanodes than the
-   * maxDatanodesRatioToInvolvePerIteration limit.
-   */
-  @Test
-  public void containerBalancerShouldObeyMaxDatanodesToInvolveLimit()
-      throws IllegalContainerBalancerStateException, IOException,
-      InvalidContainerBalancerConfigurationException, TimeoutException {
-    int percent = 20;
-    balancerConfiguration.setMaxDatanodesPercentageToInvolvePerIteration(
-        percent);
-    balancerConfiguration.setMaxSizeToMovePerIteration(100 * STORAGE_UNIT);
-    balancerConfiguration.setThreshold(1);
-    balancerConfiguration.setIterations(1);
-    startBalancer(balancerConfiguration);
-
-    sleepWhileBalancing(500);
-
-    int number = percent * numberOfNodes / 100;
-    ContainerBalancerMetrics metrics = containerBalancer.getMetrics();
-    Assertions.assertFalse(
-        containerBalancer.getCountDatanodesInvolvedPerIteration() > number);
-    Assertions.assertTrue(
-        metrics.getNumDatanodesInvolvedInLatestIteration() > 0);
-    Assertions.assertFalse(
-        metrics.getNumDatanodesInvolvedInLatestIteration() > number);
-    stopBalancer();
-  }
+  public void testStartBalancerStop() throws Exception {
+    Mockito.when(replicationManager.move(Mockito.any(ContainerID.class),
+            Mockito.any(DatanodeDetails.class),
+            Mockito.any(DatanodeDetails.class)))
+        .thenReturn(genCompletableFuture(1000));
 
-  @Test
-  public void containerBalancerShouldSelectOnlyClosedContainers()
-      throws IllegalContainerBalancerStateException, IOException,
-      InvalidContainerBalancerConfigurationException, TimeoutException {
-    // make all containers open, balancer should not select any of them
-    for (ContainerInfo containerInfo : cidToInfoMap.values()) {
-      containerInfo.setState(HddsProtos.LifeCycleState.OPEN);
-    }
     balancerConfiguration.setThreshold(10);
-    startBalancer(balancerConfiguration);
-    sleepWhileBalancing(500);
-    stopBalancer();
-
-    // balancer should have identified unbalanced nodes
-    Assertions.assertFalse(containerBalancer.getUnBalancedNodes().isEmpty());
-    // no container should have been selected
-    Assertions.assertTrue(containerBalancer.getContainerToSourceMap()
-        .isEmpty());
-    /*
-    Iteration result should be CAN_NOT_BALANCE_ANY_MORE because no container
-    move is generated
-     */
-    Assertions.assertEquals(
-        ContainerBalancer.IterationResult.CAN_NOT_BALANCE_ANY_MORE,
-        containerBalancer.getIterationResult());
-
-    // now, close all containers
-    for (ContainerInfo containerInfo : cidToInfoMap.values()) {
-      containerInfo.setState(HddsProtos.LifeCycleState.CLOSED);
-    }
-    startBalancer(balancerConfiguration);
-    sleepWhileBalancing(500);
-    stopBalancer();
-
-    // check whether all selected containers are closed
-    for (ContainerID cid:
-         containerBalancer.getContainerToSourceMap().keySet()) {
-      Assertions.assertSame(
-          cidToInfoMap.get(cid).getState(), HddsProtos.LifeCycleState.CLOSED);
-    }
-  }
-
-  @Test
-  public void containerBalancerShouldObeyMaxSizeToMoveLimit()
-      throws IllegalContainerBalancerStateException, IOException,
-      InvalidContainerBalancerConfigurationException, TimeoutException {
-    balancerConfiguration.setThreshold(1);
-    balancerConfiguration.setMaxSizeToMovePerIteration(10 * STORAGE_UNIT);
     balancerConfiguration.setIterations(1);
-    startBalancer(balancerConfiguration);
-
-    sleepWhileBalancing(500);
-
-    // balancer should not have moved more size than the limit
-    Assertions.assertFalse(
-        containerBalancer.getSizeScheduledForMoveInLatestIteration() >
-        10 * STORAGE_UNIT);
-
-    long size = containerBalancer.getMetrics()
-        .getDataSizeMovedGBInLatestIteration();
-    Assertions.assertTrue(size > 0);
-    Assertions.assertFalse(size > 10);
-    stopBalancer();
-  }
-
-  @Test
-  public void targetDatanodeShouldNotAlreadyContainSelectedContainer()
-      throws IllegalContainerBalancerStateException, IOException,
-      InvalidContainerBalancerConfigurationException, TimeoutException {
-    balancerConfiguration.setThreshold(10);
+    balancerConfiguration.setMaxSizeEnteringTarget(10 * STORAGE_UNIT);
     balancerConfiguration.setMaxSizeToMovePerIteration(100 * STORAGE_UNIT);
     balancerConfiguration.setMaxDatanodesPercentageToInvolvePerIteration(100);
-    startBalancer(balancerConfiguration);
-
-    sleepWhileBalancing(1000);
-
-    stopBalancer();
-    Map<ContainerID, DatanodeDetails> map =
-        containerBalancer.getContainerToTargetMap();
-    for (Map.Entry<ContainerID, DatanodeDetails> entry : map.entrySet()) {
-      ContainerID container = entry.getKey();
-      DatanodeDetails target = entry.getValue();
-      Assertions.assertTrue(cidToReplicasMap.get(container)
-          .stream()
-          .map(ContainerReplica::getDatanodeDetails)
-          .noneMatch(target::equals));
-    }
-  }
+    balancerConfiguration.setMoveTimeout(Duration.ofMillis(1000));
 
-  @Test
-  public void containerMoveSelectionShouldFollowPlacementPolicy()
-      throws IllegalContainerBalancerStateException, IOException,
-      InvalidContainerBalancerConfigurationException, TimeoutException {
-    balancerConfiguration.setThreshold(10);
-    balancerConfiguration.setMaxSizeToMovePerIteration(50 * STORAGE_UNIT);
-    balancerConfiguration.setMaxDatanodesPercentageToInvolvePerIteration(100);
-    balancerConfiguration.setIterations(1);
     startBalancer(balancerConfiguration);
-
-    sleepWhileBalancing(500);
-
-    stopBalancer();
-    Map<ContainerID, DatanodeDetails> containerFromSourceMap =
-        containerBalancer.getContainerToSourceMap();
-    Map<ContainerID, DatanodeDetails> containerToTargetMap =
-        containerBalancer.getContainerToTargetMap();
-
-    // for each move selection, check if {replicas - source + target}
-    // satisfies placement policy
-    for (Map.Entry<ContainerID, DatanodeDetails> entry :
-        containerFromSourceMap.entrySet()) {
-      ContainerID container = entry.getKey();
-      DatanodeDetails source = entry.getValue();
-
-      List<DatanodeDetails> replicas = cidToReplicasMap.get(container)
-          .stream()
-          .map(ContainerReplica::getDatanodeDetails)
-          .collect(Collectors.toList());
-      // remove source and add target
-      replicas.remove(source);
-      replicas.add(containerToTargetMap.get(container));
-
-      ContainerInfo containerInfo = cidToInfoMap.get(container);
-      ContainerPlacementStatus placementStatus =
-          placementPolicy.validateContainerPlacement(replicas,
-              containerInfo.getReplicationConfig().getRequiredNodes());
-      Assertions.assertTrue(placementStatus.isPolicySatisfied());
-    }
-  }
-
-  @Test
-  public void targetDatanodeShouldBeInServiceHealthy()
-      throws NodeNotFoundException, IllegalContainerBalancerStateException,
-      IOException, InvalidContainerBalancerConfigurationException,
-      TimeoutException {
-    balancerConfiguration.setThreshold(10);
-    balancerConfiguration.setMaxDatanodesPercentageToInvolvePerIteration(100);
-    balancerConfiguration.setMaxSizeToMovePerIteration(50 * STORAGE_UNIT);
-    balancerConfiguration.setMaxSizeEnteringTarget(50 * STORAGE_UNIT);
-    balancerConfiguration.setIterations(1);
-    startBalancer(balancerConfiguration);
-
-    sleepWhileBalancing(500);
-
-    stopBalancer();
-    for (DatanodeDetails target : containerBalancer.getSelectedTargets()) {
-      NodeStatus status = mockNodeManager.getNodeStatus(target);
-      Assertions.assertSame(HddsProtos.NodeOperationalState.IN_SERVICE,
-          status.getOperationalState());
-      Assertions.assertTrue(status.isHealthy());
+    try {
+      containerBalancer.startBalancer(balancerConfiguration);
+    } catch (IllegalContainerBalancerStateException e) {
+      // start failed
+      Assertions.assertTrue(true);

Review Comment:
   > Do we want to assert another variable here?
   
   Here we are verifying that exception should occur when task is already running, as behavior validation. Variable state is known to be running and expectation that exception must be thrown.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] lokeshj1703 commented on a diff in pull request #3782: HDDS-7214. Continuous start & stop can have hanging threads in stopping

Posted by GitBox <gi...@apache.org>.
lokeshj1703 commented on code in PR #3782:
URL: https://github.com/apache/ozone/pull/3782#discussion_r989699725


##########
hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancer.java:
##########
@@ -235,16 +235,16 @@ public void testStartBalancerStop() throws Exception {
     startBalancer(balancerConfiguration);
     try {
       containerBalancer.startBalancer(balancerConfiguration);
+      Assertions.assertTrue(false);

Review Comment:
   Lets also add a message in the assertion like "An exception should have been thrown"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] sumitagrawl commented on a diff in pull request #3782: HDDS-7214. Continuous start & stop can have hanging threads in stopping

Posted by GitBox <gi...@apache.org>.
sumitagrawl commented on code in PR #3782:
URL: https://github.com/apache/ozone/pull/3782#discussion_r988751441


##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerTask.java:
##########
@@ -0,0 +1,1009 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container.balancer;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.conf.StorageUnit;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.PlacementPolicyValidateProxy;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
+import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
+import org.apache.hadoop.hdds.scm.container.replication.LegacyReplicationManager;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.scm.net.NetworkTopology;
+import org.apache.hadoop.hdds.scm.node.DatanodeUsageInfo;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_NODE_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_NODE_REPORT_INTERVAL_DEFAULT;
+
+/**
+ * Container balancer is a service in SCM to move containers between over- and
+ * under-utilized datanodes.
+ */
+public class ContainerBalancerTask implements Runnable {
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(ContainerBalancerTask.class);
+
+  private NodeManager nodeManager;
+  private ContainerManager containerManager;
+  private ReplicationManager replicationManager;
+  private OzoneConfiguration ozoneConfiguration;
+  private ContainerBalancer containerBalancer;
+  private final SCMContext scmContext;
+  private double threshold;
+  private int totalNodesInCluster;
+  private double maxDatanodesRatioToInvolvePerIteration;
+  private long maxSizeToMovePerIteration;
+  private int countDatanodesInvolvedPerIteration;
+  private long sizeScheduledForMoveInLatestIteration;
+  // count actual size moved in bytes
+  private long sizeActuallyMovedInLatestIteration;
+  private int iterations;
+  private List<DatanodeUsageInfo> unBalancedNodes;
+  private List<DatanodeUsageInfo> overUtilizedNodes;
+  private List<DatanodeUsageInfo> underUtilizedNodes;
+  private List<DatanodeUsageInfo> withinThresholdUtilizedNodes;
+  private Set<String> excludeNodes;
+  private Set<String> includeNodes;
+  private ContainerBalancerConfiguration config;
+  private ContainerBalancerMetrics metrics;
+  private long clusterCapacity;
+  private long clusterRemaining;
+  private double clusterAvgUtilisation;
+  private PlacementPolicyValidateProxy placementPolicyValidateProxy;
+  private NetworkTopology networkTopology;
+  private double upperLimit;
+  private double lowerLimit;
+  private ContainerBalancerSelectionCriteria selectionCriteria;
+  private volatile Status taskStatus = Status.RUNNING;
+
+  /*
+  Since a container can be selected only once during an iteration, these maps
+   use it as a primary key to track source to target pairings.
+  */
+  private final Map<ContainerID, DatanodeDetails> containerToSourceMap;
+  private final Map<ContainerID, DatanodeDetails> containerToTargetMap;
+
+  private Set<DatanodeDetails> selectedTargets;
+  private Set<DatanodeDetails> selectedSources;
+  private FindTargetStrategy findTargetStrategy;
+  private FindSourceStrategy findSourceStrategy;
+  private Map<ContainerMoveSelection,
+      CompletableFuture<LegacyReplicationManager.MoveResult>>
+      moveSelectionToFutureMap;
+  private IterationResult iterationResult;
+  private int nextIterationIndex;
+
+  /**
+   * Constructs ContainerBalancer with the specified arguments. Initializes
+   * ContainerBalancerMetrics. Container Balancer does not start on
+   * construction.
+   *
+   * @param scm the storage container manager
+   */
+  public ContainerBalancerTask(StorageContainerManager scm,
+                               int nextIterationIndex,
+                               ContainerBalancer containerBalancer,
+                               ContainerBalancerMetrics metrics,
+                               ContainerBalancerConfiguration config) {
+    this.nodeManager = scm.getScmNodeManager();
+    this.containerManager = scm.getContainerManager();
+    this.replicationManager = scm.getReplicationManager();
+    this.ozoneConfiguration = scm.getConfiguration();
+    this.containerBalancer = containerBalancer;
+    this.config = config;
+    this.metrics = metrics;
+    this.scmContext = scm.getScmContext();
+    this.overUtilizedNodes = new ArrayList<>();
+    this.underUtilizedNodes = new ArrayList<>();
+    this.withinThresholdUtilizedNodes = new ArrayList<>();
+    this.unBalancedNodes = new ArrayList<>();
+    this.placementPolicyValidateProxy = scm.getPlacementPolicyValidateProxy();
+    this.networkTopology = scm.getClusterMap();
+    this.nextIterationIndex = nextIterationIndex;
+    this.containerToSourceMap = new HashMap<>();
+    this.containerToTargetMap = new HashMap<>();
+    this.selectedSources = new HashSet<>();
+    this.selectedTargets = new HashSet<>();
+    findSourceStrategy = new FindSourceGreedy(nodeManager);
+  }
+
+  /**
+   * Balances the cluster in iterations. Regularly checks if balancing has
+   * been stopped.
+   */
+  public void run() {
+    try {
+      balancer();
+
+      // mark balancer completion, if ha and stopped as not a leader, it will
+      // be handled inside for not saving configuration with leader check
+      saveConfiguration(config, false, 0);
+    } catch (Throwable e) {
+      LOG.error("Container Balancer is stopped abnormally, ", e);
+    }
+    taskStatus = Status.STOPPED;
+  }
+
+  /**
+   * Tries to stop ContainerBalancer changing status to stopping. Calls
+   * {@link ContainerBalancerTask#stop()}.
+   */
+  public void stop() {
+    taskStatus = Status.STOPPING;
+  }
+
+  private void balancer() {
+    this.iterations = config.getIterations();
+    if (this.iterations == -1) {
+      //run balancer infinitely
+      this.iterations = Integer.MAX_VALUE;
+    }
+
+    // nextIterationIndex is the iteration that balancer should start from on
+    // leader change or restart
+    int i = nextIterationIndex;
+    for (; i < iterations && isBalancerRunning(); i++) {
+      // reset some variables and metrics for this iteration
+      resetState();
+
+      if (!isBalancerRunning()) {
+        return;
+      }
+
+      if (config.getTriggerDuEnable()) {
+        // before starting a new iteration, we trigger all the datanode
+        // to run `du`. this is an aggressive action, with which we can
+        // get more precise usage info of all datanodes before moving.
+        // this is helpful for container balancer to make more appropriate
+        // decisions. this will increase the disk io load of data nodes, so
+        // please enable it with caution.
+        nodeManager.refreshAllHealthyDnUsageInfo();
+        try {
+          long nodeReportInterval =
+              ozoneConfiguration.getTimeDuration(HDDS_NODE_REPORT_INTERVAL,
+                  HDDS_NODE_REPORT_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS);
+          // one for sending command , one for running du, and one for
+          // reporting back make it like this for now, a more suitable
+          // value. can be set in the future if needed
+          wait(3 * nodeReportInterval);

Review Comment:
   > I think it is a requirement to have wait inside synchronized block.
   
   Not finding any usecase to be in synchronized block, since this is single run only. Searched all usecase in existing code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] lokeshj1703 commented on a diff in pull request #3782: HDDS-7214. Continuous start & stop can have hanging threads in stopping

Posted by GitBox <gi...@apache.org>.
lokeshj1703 commented on code in PR #3782:
URL: https://github.com/apache/ozone/pull/3782#discussion_r988878472


##########
hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancer.java:
##########
@@ -212,559 +206,102 @@ public void setup() throws IOException, NodeNotFoundException,
   }
 
   @Test
-  public void testCalculationOfUtilization() {
-    Assertions.assertEquals(nodesInCluster.size(), nodeUtilizations.size());
-    for (int i = 0; i < nodesInCluster.size(); i++) {
-      Assertions.assertEquals(nodeUtilizations.get(i),
-          nodesInCluster.get(i).calculateUtilization(), 0.0001);
-    }
-
+  public void testShouldRun() throws Exception {
+    boolean doRun = containerBalancer.shouldRun();
     // should be equal to average utilization of the cluster
-    Assertions.assertEquals(averageUtilization,
-        containerBalancer.calculateAvgUtilization(nodesInCluster), 0.0001);
-  }
-
-  /**
-   * Checks whether ContainerBalancer is correctly updating the list of
-   * unBalanced nodes with varying values of Threshold.
-   */
-  @Test
-  public void
-      initializeIterationShouldUpdateUnBalancedNodesWhenThresholdChanges()
-      throws IllegalContainerBalancerStateException, IOException,
-      InvalidContainerBalancerConfigurationException, TimeoutException {
-    List<DatanodeUsageInfo> expectedUnBalancedNodes;
-    List<DatanodeUsageInfo> unBalancedNodesAccordingToBalancer;
-
-    // check for random threshold values
-    for (int i = 0; i < 50; i++) {
-      double randomThreshold = RANDOM.nextDouble() * 100;
-
-      balancerConfiguration.setThreshold(randomThreshold);
-      startBalancer(balancerConfiguration);
-
-      // waiting for balance completed.
-      // TODO: this is a temporary implementation for now
-      // modify this after balancer is fully completed
-      try {
-        Thread.sleep(100);
-      } catch (InterruptedException e) { }
-
-      expectedUnBalancedNodes =
-          determineExpectedUnBalancedNodes(randomThreshold);
-      unBalancedNodesAccordingToBalancer =
-          containerBalancer.getUnBalancedNodes();
-
-      stopBalancer();
-      Assertions.assertEquals(
-          expectedUnBalancedNodes.size(),
-          unBalancedNodesAccordingToBalancer.size());
-
-      for (int j = 0; j < expectedUnBalancedNodes.size(); j++) {
-        Assertions.assertEquals(
-            expectedUnBalancedNodes.get(j).getDatanodeDetails(),
-            unBalancedNodesAccordingToBalancer.get(j).getDatanodeDetails());
-      }
-    }
+    Assertions.assertEquals(doRun, false);
+    containerBalancer.saveConfiguration(balancerConfiguration, true, 0);
+    doRun = containerBalancer.shouldRun();
+    Assertions.assertEquals(doRun, true);
+    containerBalancer.saveConfiguration(balancerConfiguration, false, 0);
+    doRun = containerBalancer.shouldRun();
+    Assertions.assertEquals(doRun, false);
   }
 
-  /**
-   * Checks whether the list of unBalanced nodes is empty when the cluster is
-   * balanced.
-   */
   @Test
-  public void unBalancedNodesListShouldBeEmptyWhenClusterIsBalanced()
-      throws IllegalContainerBalancerStateException, IOException,
-      InvalidContainerBalancerConfigurationException, TimeoutException {
-    balancerConfiguration.setThreshold(99.99);
-    startBalancer(balancerConfiguration);
-
-    sleepWhileBalancing(100);
-
-    stopBalancer();
-    ContainerBalancerMetrics metrics = containerBalancer.getMetrics();
-    Assertions.assertEquals(0, containerBalancer.getUnBalancedNodes().size());
-    Assertions.assertEquals(0, metrics.getNumDatanodesUnbalanced());
-  }
-
-  /**
-   * ContainerBalancer should not involve more datanodes than the
-   * maxDatanodesRatioToInvolvePerIteration limit.
-   */
-  @Test
-  public void containerBalancerShouldObeyMaxDatanodesToInvolveLimit()
-      throws IllegalContainerBalancerStateException, IOException,
-      InvalidContainerBalancerConfigurationException, TimeoutException {
-    int percent = 20;
-    balancerConfiguration.setMaxDatanodesPercentageToInvolvePerIteration(
-        percent);
-    balancerConfiguration.setMaxSizeToMovePerIteration(100 * STORAGE_UNIT);
-    balancerConfiguration.setThreshold(1);
-    balancerConfiguration.setIterations(1);
-    startBalancer(balancerConfiguration);
-
-    sleepWhileBalancing(500);
-
-    int number = percent * numberOfNodes / 100;
-    ContainerBalancerMetrics metrics = containerBalancer.getMetrics();
-    Assertions.assertFalse(
-        containerBalancer.getCountDatanodesInvolvedPerIteration() > number);
-    Assertions.assertTrue(
-        metrics.getNumDatanodesInvolvedInLatestIteration() > 0);
-    Assertions.assertFalse(
-        metrics.getNumDatanodesInvolvedInLatestIteration() > number);
-    stopBalancer();
-  }
+  public void testStartBalancerStop() throws Exception {
+    Mockito.when(replicationManager.move(Mockito.any(ContainerID.class),
+            Mockito.any(DatanodeDetails.class),
+            Mockito.any(DatanodeDetails.class)))
+        .thenReturn(genCompletableFuture(1000));
 
-  @Test
-  public void containerBalancerShouldSelectOnlyClosedContainers()
-      throws IllegalContainerBalancerStateException, IOException,
-      InvalidContainerBalancerConfigurationException, TimeoutException {
-    // make all containers open, balancer should not select any of them
-    for (ContainerInfo containerInfo : cidToInfoMap.values()) {
-      containerInfo.setState(HddsProtos.LifeCycleState.OPEN);
-    }
     balancerConfiguration.setThreshold(10);
-    startBalancer(balancerConfiguration);
-    sleepWhileBalancing(500);
-    stopBalancer();
-
-    // balancer should have identified unbalanced nodes
-    Assertions.assertFalse(containerBalancer.getUnBalancedNodes().isEmpty());
-    // no container should have been selected
-    Assertions.assertTrue(containerBalancer.getContainerToSourceMap()
-        .isEmpty());
-    /*
-    Iteration result should be CAN_NOT_BALANCE_ANY_MORE because no container
-    move is generated
-     */
-    Assertions.assertEquals(
-        ContainerBalancer.IterationResult.CAN_NOT_BALANCE_ANY_MORE,
-        containerBalancer.getIterationResult());
-
-    // now, close all containers
-    for (ContainerInfo containerInfo : cidToInfoMap.values()) {
-      containerInfo.setState(HddsProtos.LifeCycleState.CLOSED);
-    }
-    startBalancer(balancerConfiguration);
-    sleepWhileBalancing(500);
-    stopBalancer();
-
-    // check whether all selected containers are closed
-    for (ContainerID cid:
-         containerBalancer.getContainerToSourceMap().keySet()) {
-      Assertions.assertSame(
-          cidToInfoMap.get(cid).getState(), HddsProtos.LifeCycleState.CLOSED);
-    }
-  }
-
-  @Test
-  public void containerBalancerShouldObeyMaxSizeToMoveLimit()
-      throws IllegalContainerBalancerStateException, IOException,
-      InvalidContainerBalancerConfigurationException, TimeoutException {
-    balancerConfiguration.setThreshold(1);
-    balancerConfiguration.setMaxSizeToMovePerIteration(10 * STORAGE_UNIT);
     balancerConfiguration.setIterations(1);
-    startBalancer(balancerConfiguration);
-
-    sleepWhileBalancing(500);
-
-    // balancer should not have moved more size than the limit
-    Assertions.assertFalse(
-        containerBalancer.getSizeScheduledForMoveInLatestIteration() >
-        10 * STORAGE_UNIT);
-
-    long size = containerBalancer.getMetrics()
-        .getDataSizeMovedGBInLatestIteration();
-    Assertions.assertTrue(size > 0);
-    Assertions.assertFalse(size > 10);
-    stopBalancer();
-  }
-
-  @Test
-  public void targetDatanodeShouldNotAlreadyContainSelectedContainer()
-      throws IllegalContainerBalancerStateException, IOException,
-      InvalidContainerBalancerConfigurationException, TimeoutException {
-    balancerConfiguration.setThreshold(10);
+    balancerConfiguration.setMaxSizeEnteringTarget(10 * STORAGE_UNIT);
     balancerConfiguration.setMaxSizeToMovePerIteration(100 * STORAGE_UNIT);
     balancerConfiguration.setMaxDatanodesPercentageToInvolvePerIteration(100);
-    startBalancer(balancerConfiguration);
-
-    sleepWhileBalancing(1000);
-
-    stopBalancer();
-    Map<ContainerID, DatanodeDetails> map =
-        containerBalancer.getContainerToTargetMap();
-    for (Map.Entry<ContainerID, DatanodeDetails> entry : map.entrySet()) {
-      ContainerID container = entry.getKey();
-      DatanodeDetails target = entry.getValue();
-      Assertions.assertTrue(cidToReplicasMap.get(container)
-          .stream()
-          .map(ContainerReplica::getDatanodeDetails)
-          .noneMatch(target::equals));
-    }
-  }
+    balancerConfiguration.setMoveTimeout(Duration.ofMillis(1000));
 
-  @Test
-  public void containerMoveSelectionShouldFollowPlacementPolicy()
-      throws IllegalContainerBalancerStateException, IOException,
-      InvalidContainerBalancerConfigurationException, TimeoutException {
-    balancerConfiguration.setThreshold(10);
-    balancerConfiguration.setMaxSizeToMovePerIteration(50 * STORAGE_UNIT);
-    balancerConfiguration.setMaxDatanodesPercentageToInvolvePerIteration(100);
-    balancerConfiguration.setIterations(1);
     startBalancer(balancerConfiguration);
-
-    sleepWhileBalancing(500);
-
-    stopBalancer();
-    Map<ContainerID, DatanodeDetails> containerFromSourceMap =
-        containerBalancer.getContainerToSourceMap();
-    Map<ContainerID, DatanodeDetails> containerToTargetMap =
-        containerBalancer.getContainerToTargetMap();
-
-    // for each move selection, check if {replicas - source + target}
-    // satisfies placement policy
-    for (Map.Entry<ContainerID, DatanodeDetails> entry :
-        containerFromSourceMap.entrySet()) {
-      ContainerID container = entry.getKey();
-      DatanodeDetails source = entry.getValue();
-
-      List<DatanodeDetails> replicas = cidToReplicasMap.get(container)
-          .stream()
-          .map(ContainerReplica::getDatanodeDetails)
-          .collect(Collectors.toList());
-      // remove source and add target
-      replicas.remove(source);
-      replicas.add(containerToTargetMap.get(container));
-
-      ContainerInfo containerInfo = cidToInfoMap.get(container);
-      ContainerPlacementStatus placementStatus =
-          placementPolicy.validateContainerPlacement(replicas,
-              containerInfo.getReplicationConfig().getRequiredNodes());
-      Assertions.assertTrue(placementStatus.isPolicySatisfied());
-    }
-  }
-
-  @Test
-  public void targetDatanodeShouldBeInServiceHealthy()
-      throws NodeNotFoundException, IllegalContainerBalancerStateException,
-      IOException, InvalidContainerBalancerConfigurationException,
-      TimeoutException {
-    balancerConfiguration.setThreshold(10);
-    balancerConfiguration.setMaxDatanodesPercentageToInvolvePerIteration(100);
-    balancerConfiguration.setMaxSizeToMovePerIteration(50 * STORAGE_UNIT);
-    balancerConfiguration.setMaxSizeEnteringTarget(50 * STORAGE_UNIT);
-    balancerConfiguration.setIterations(1);
-    startBalancer(balancerConfiguration);
-
-    sleepWhileBalancing(500);
-
-    stopBalancer();
-    for (DatanodeDetails target : containerBalancer.getSelectedTargets()) {
-      NodeStatus status = mockNodeManager.getNodeStatus(target);
-      Assertions.assertSame(HddsProtos.NodeOperationalState.IN_SERVICE,
-          status.getOperationalState());
-      Assertions.assertTrue(status.isHealthy());
+    try {
+      containerBalancer.startBalancer(balancerConfiguration);
+    } catch (IllegalContainerBalancerStateException e) {
+      // start failed
+      Assertions.assertTrue(true);

Review Comment:
   I think we can achieve it by asserting false just after start balancer call to make sure exception is triggered.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] siddhantsangwan commented on a diff in pull request #3782: HDDS-7214. Continuous start & stop can have hanging threads in stopping

Posted by GitBox <gi...@apache.org>.
siddhantsangwan commented on code in PR #3782:
URL: https://github.com/apache/ozone/pull/3782#discussion_r989971036


##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java:
##########
@@ -892,34 +79,38 @@ private void resetState() {
    */
   @Override
   public void notifyStatusChanged() {
-    boolean shouldStop = false;
-    boolean shouldRun = false;
+    if (!scmContext.isLeader() || scmContext.isInSafeMode()) {
+      boolean shouldStop;
+      lock.lock();
+      try {
+        shouldStop = canBalancerStop();
+      } finally {
+        lock.unlock();
+      }
+      if (shouldStop) {
+        LOG.info("Stopping ContainerBalancer in this scm on status change");
+        stop();
+      }
+      return;
+    }
+
     lock.lock();
     try {
-      if (!scmContext.isLeader() || scmContext.isInSafeMode()) {
-        shouldStop = isBalancerRunning();
-      } else {
-        shouldRun = shouldRun();
+      // else check for start
+      boolean shouldRun = shouldRun();
+      if (shouldRun && canBalancerStart()) {

Review Comment:
   I'm wondering if there's a case for which we need the `canBalancerStart()` check here. Is it possible that balancer would already be running in the new leader SCM or when an SCM gets out of safe mode?



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java:
##########
@@ -1072,15 +295,15 @@ private void validateState(boolean expectedRunning)
       LOG.warn("SCM is in safe mode");
       throw new IllegalContainerBalancerStateException("SCM is in safe mode");
     }
-    lock.lock();
-    try {
-      if (isBalancerRunning() != expectedRunning) {
-        throw new IllegalContainerBalancerStateException(
-            "Expect ContainerBalancer running state to be " + expectedRunning +
-                ", but running state is actually " + isBalancerRunning());
-      }
-    } finally {
-      lock.unlock();
+    if (!expectedRunning && !canBalancerStart()) {
+      throw new IllegalContainerBalancerStateException(
+          "Expect ContainerBalancer as not running state" +
+              ", but running state is actually " + getBalancerStatus());
+    }
+    if (expectedRunning && !canBalancerStop()) {
+      throw new IllegalContainerBalancerStateException(
+          "Expect ContainerBalancer as running state" +
+              ", but running state is actually " + getBalancerStatus());

Review Comment:
   Do we need this change? It seems unnecessary.



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java:
##########
@@ -1137,45 +351,21 @@ public void stopBalancer()
       TimeoutException {
     lock.lock();
     try {
-      // should be leader, out of safe mode, and currently running
       validateState(true);
-      saveConfiguration(config, false, 0);

Review Comment:
   I think we should persist to DB that balancer should not run before stopping it.



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java:
##########
@@ -950,10 +141,41 @@ public boolean shouldRun() {
   /**
    * Checks if ContainerBalancer is currently running in this SCM.
    *
-   * @return true if the currentBalancingThread is not null, otherwise false
+   * @return true if balancer started, otherwise false
    */
   public boolean isBalancerRunning() {
-    return currentBalancingThread != null;
+    return (null != task
+        && task.getBalancerStatus() == ContainerBalancerTask.Status.RUNNING);
+  }
+
+  /**
+   * Checks if ContainerBalancer in valid state and can be started.
+   *
+   * @return true if balancer can be started, otherwise false
+   */
+  private boolean canBalancerStart() {
+    return (null == task
+        || task.getBalancerStatus() == ContainerBalancerTask.Status.STOPPED);
+  }
+
+  /**
+   * get the Container Balancer state.
+   *
+   * @return true if balancer started, otherwise false
+   */
+  public ContainerBalancerTask.Status getBalancerStatus() {
+    return null != task ? task.getBalancerStatus()
+        : ContainerBalancerTask.Status.STOPPED;
+  }
+
+  /**
+   * Checks if ContainerBalancer is in valid state to call stop.
+   *
+   * @return true if balancer can be stopped, otherwise false
+   */
+  private boolean canBalancerStop() {
+    return null != task
+        && task.getBalancerStatus() == ContainerBalancerTask.Status.RUNNING;
   }

Review Comment:
   This method seems redundant. `isBalancerRunning` can be used instead. If we want to have `canBalancerStop` for better readability, let's just call `isBalancerRunning` inside it?



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java:
##########
@@ -892,34 +79,38 @@ private void resetState() {
    */
   @Override
   public void notifyStatusChanged() {
-    boolean shouldStop = false;
-    boolean shouldRun = false;
+    if (!scmContext.isLeader() || scmContext.isInSafeMode()) {
+      boolean shouldStop;
+      lock.lock();
+      try {
+        shouldStop = canBalancerStop();
+      } finally {
+        lock.unlock();
+      }
+      if (shouldStop) {
+        LOG.info("Stopping ContainerBalancer in this scm on status change");
+        stop();
+      }
+      return;
+    }
+
     lock.lock();

Review Comment:
   Any reason for acquiring the lock twice instead of once like before?



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java:
##########
@@ -1092,33 +315,24 @@ public void stop() {
     Thread balancingThread;
     lock.lock();
     try {
-      if (!isBalancerRunning()) {
-        LOG.warn("Cannot stop Container Balancer because it's not running");
+      if (!canBalancerStop()) {
+        LOG.warn("Cannot stop Container Balancer because it's not running or " +
+            "stopping");
         return;
       }
+      task.stop();
       balancingThread = currentBalancingThread;

Review Comment:
   NIT: Since we're not setting `currentBalancingThread` to null now, we don't need the temporary variable `balancingThread`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] sumitagrawl commented on a diff in pull request #3782: HDDS-7214. Continuous start & stop can have hanging threads in stopping

Posted by GitBox <gi...@apache.org>.
sumitagrawl commented on code in PR #3782:
URL: https://github.com/apache/ozone/pull/3782#discussion_r990380331


##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java:
##########
@@ -950,10 +141,41 @@ public boolean shouldRun() {
   /**
    * Checks if ContainerBalancer is currently running in this SCM.
    *
-   * @return true if the currentBalancingThread is not null, otherwise false
+   * @return true if balancer started, otherwise false
    */
   public boolean isBalancerRunning() {
-    return currentBalancingThread != null;
+    return (null != task
+        && task.getBalancerStatus() == ContainerBalancerTask.Status.RUNNING);
+  }
+
+  /**
+   * Checks if ContainerBalancer in valid state and can be started.
+   *
+   * @return true if balancer can be started, otherwise false
+   */
+  private boolean canBalancerStart() {
+    return (null == task
+        || task.getBalancerStatus() == ContainerBalancerTask.Status.STOPPED);
+  }
+
+  /**
+   * get the Container Balancer state.
+   *
+   * @return true if balancer started, otherwise false
+   */
+  public ContainerBalancerTask.Status getBalancerStatus() {
+    return null != task ? task.getBalancerStatus()
+        : ContainerBalancerTask.Status.STOPPED;
+  }
+
+  /**
+   * Checks if ContainerBalancer is in valid state to call stop.
+   *
+   * @return true if balancer can be stopped, otherwise false
+   */
+  private boolean canBalancerStop() {
+    return null != task
+        && task.getBalancerStatus() == ContainerBalancerTask.Status.RUNNING;
   }

Review Comment:
   There is a state machine,



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] sumitagrawl commented on a diff in pull request #3782: HDDS-7214. Continuous start & stop can have hanging threads in stopping

Posted by GitBox <gi...@apache.org>.
sumitagrawl commented on code in PR #3782:
URL: https://github.com/apache/ozone/pull/3782#discussion_r990391786


##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java:
##########
@@ -950,10 +141,41 @@ public boolean shouldRun() {
   /**
    * Checks if ContainerBalancer is currently running in this SCM.
    *
-   * @return true if the currentBalancingThread is not null, otherwise false
+   * @return true if balancer started, otherwise false
    */
   public boolean isBalancerRunning() {
-    return currentBalancingThread != null;
+    return (null != task
+        && task.getBalancerStatus() == ContainerBalancerTask.Status.RUNNING);
+  }
+
+  /**
+   * Checks if ContainerBalancer in valid state and can be started.
+   *
+   * @return true if balancer can be started, otherwise false
+   */
+  private boolean canBalancerStart() {
+    return (null == task
+        || task.getBalancerStatus() == ContainerBalancerTask.Status.STOPPED);
+  }
+
+  /**
+   * get the Container Balancer state.
+   *
+   * @return true if balancer started, otherwise false
+   */
+  public ContainerBalancerTask.Status getBalancerStatus() {
+    return null != task ? task.getBalancerStatus()
+        : ContainerBalancerTask.Status.STOPPED;
+  }
+
+  /**
+   * Checks if ContainerBalancer is in valid state to call stop.
+   *
+   * @return true if balancer can be stopped, otherwise false
+   */
+  private boolean canBalancerStop() {
+    return null != task
+        && task.getBalancerStatus() == ContainerBalancerTask.Status.RUNNING;
   }

Review Comment:
   1. save configuration is done when stop is done by balancerTask, as that can tell actually its stopped
   2. isBalancerRunning just check if Running state. But canBalancerStop also check if stopping and disallow that state also if stop is called again.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] sumitagrawl commented on a diff in pull request #3782: HDDS-7214. Continuous start & stop can have hanging threads in stopping

Posted by GitBox <gi...@apache.org>.
sumitagrawl commented on code in PR #3782:
URL: https://github.com/apache/ozone/pull/3782#discussion_r990387914


##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java:
##########
@@ -892,34 +79,38 @@ private void resetState() {
    */
   @Override
   public void notifyStatusChanged() {
-    boolean shouldStop = false;
-    boolean shouldRun = false;
+    if (!scmContext.isLeader() || scmContext.isInSafeMode()) {
+      boolean shouldStop;
+      lock.lock();
+      try {
+        shouldStop = canBalancerStop();
+      } finally {
+        lock.unlock();
+      }
+      if (shouldStop) {
+        LOG.info("Stopping ContainerBalancer in this scm on status change");
+        stop();
+      }
+      return;
+    }
+
     lock.lock();

Review Comment:
   There is no change in logic for this,
   1. Earlier within first lock, its decided if stop or start. Stop is called outside lock. And also start
   
   2.  Now, 2 different check, first for stop, if its stop, check within lock, then call stop outside
       Next start check within lock and continue to perform start within lock to avoid concurrency.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] siddhantsangwan merged pull request #3782: HDDS-7214. Continuous start & stop can have hanging threads in stopping

Posted by GitBox <gi...@apache.org>.
siddhantsangwan merged PR #3782:
URL: https://github.com/apache/ozone/pull/3782


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] lokeshj1703 commented on a diff in pull request #3782: HDDS-7214. Continuous start & stop can have hanging threads in stopping

Posted by GitBox <gi...@apache.org>.
lokeshj1703 commented on code in PR #3782:
URL: https://github.com/apache/ozone/pull/3782#discussion_r988879064


##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerTask.java:
##########
@@ -0,0 +1,1009 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container.balancer;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.conf.StorageUnit;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.PlacementPolicyValidateProxy;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
+import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
+import org.apache.hadoop.hdds.scm.container.replication.LegacyReplicationManager;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.scm.net.NetworkTopology;
+import org.apache.hadoop.hdds.scm.node.DatanodeUsageInfo;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_NODE_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_NODE_REPORT_INTERVAL_DEFAULT;
+
+/**
+ * Container balancer is a service in SCM to move containers between over- and
+ * under-utilized datanodes.
+ */
+public class ContainerBalancerTask implements Runnable {
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(ContainerBalancerTask.class);
+
+  private NodeManager nodeManager;
+  private ContainerManager containerManager;
+  private ReplicationManager replicationManager;
+  private OzoneConfiguration ozoneConfiguration;
+  private ContainerBalancer containerBalancer;
+  private final SCMContext scmContext;
+  private double threshold;
+  private int totalNodesInCluster;
+  private double maxDatanodesRatioToInvolvePerIteration;
+  private long maxSizeToMovePerIteration;
+  private int countDatanodesInvolvedPerIteration;
+  private long sizeScheduledForMoveInLatestIteration;
+  // count actual size moved in bytes
+  private long sizeActuallyMovedInLatestIteration;
+  private int iterations;
+  private List<DatanodeUsageInfo> unBalancedNodes;
+  private List<DatanodeUsageInfo> overUtilizedNodes;
+  private List<DatanodeUsageInfo> underUtilizedNodes;
+  private List<DatanodeUsageInfo> withinThresholdUtilizedNodes;
+  private Set<String> excludeNodes;
+  private Set<String> includeNodes;
+  private ContainerBalancerConfiguration config;
+  private ContainerBalancerMetrics metrics;
+  private long clusterCapacity;
+  private long clusterRemaining;
+  private double clusterAvgUtilisation;
+  private PlacementPolicyValidateProxy placementPolicyValidateProxy;
+  private NetworkTopology networkTopology;
+  private double upperLimit;
+  private double lowerLimit;
+  private ContainerBalancerSelectionCriteria selectionCriteria;
+  private volatile Status taskStatus = Status.RUNNING;
+
+  /*
+  Since a container can be selected only once during an iteration, these maps
+   use it as a primary key to track source to target pairings.
+  */
+  private final Map<ContainerID, DatanodeDetails> containerToSourceMap;
+  private final Map<ContainerID, DatanodeDetails> containerToTargetMap;
+
+  private Set<DatanodeDetails> selectedTargets;
+  private Set<DatanodeDetails> selectedSources;
+  private FindTargetStrategy findTargetStrategy;
+  private FindSourceStrategy findSourceStrategy;
+  private Map<ContainerMoveSelection,
+      CompletableFuture<LegacyReplicationManager.MoveResult>>
+      moveSelectionToFutureMap;
+  private IterationResult iterationResult;
+  private int nextIterationIndex;
+
+  /**
+   * Constructs ContainerBalancer with the specified arguments. Initializes
+   * ContainerBalancerMetrics. Container Balancer does not start on
+   * construction.
+   *
+   * @param scm the storage container manager
+   */
+  public ContainerBalancerTask(StorageContainerManager scm,
+                               int nextIterationIndex,
+                               ContainerBalancer containerBalancer,
+                               ContainerBalancerMetrics metrics,
+                               ContainerBalancerConfiguration config) {
+    this.nodeManager = scm.getScmNodeManager();
+    this.containerManager = scm.getContainerManager();
+    this.replicationManager = scm.getReplicationManager();
+    this.ozoneConfiguration = scm.getConfiguration();
+    this.containerBalancer = containerBalancer;
+    this.config = config;
+    this.metrics = metrics;
+    this.scmContext = scm.getScmContext();
+    this.overUtilizedNodes = new ArrayList<>();
+    this.underUtilizedNodes = new ArrayList<>();
+    this.withinThresholdUtilizedNodes = new ArrayList<>();
+    this.unBalancedNodes = new ArrayList<>();
+    this.placementPolicyValidateProxy = scm.getPlacementPolicyValidateProxy();
+    this.networkTopology = scm.getClusterMap();
+    this.nextIterationIndex = nextIterationIndex;
+    this.containerToSourceMap = new HashMap<>();
+    this.containerToTargetMap = new HashMap<>();
+    this.selectedSources = new HashSet<>();
+    this.selectedTargets = new HashSet<>();
+    findSourceStrategy = new FindSourceGreedy(nodeManager);
+  }
+
+  /**
+   * Balances the cluster in iterations. Regularly checks if balancing has
+   * been stopped.
+   */
+  public void run() {
+    try {
+      balancer();
+
+      // mark balancer completion, if ha and stopped as not a leader, it will
+      // be handled inside for not saving configuration with leader check
+      saveConfiguration(config, false, 0);
+    } catch (Throwable e) {
+      LOG.error("Container Balancer is stopped abnormally, ", e);
+    }
+    taskStatus = Status.STOPPED;
+  }
+
+  /**
+   * Tries to stop ContainerBalancer changing status to stopping. Calls
+   * {@link ContainerBalancerTask#stop()}.
+   */
+  public void stop() {
+    taskStatus = Status.STOPPING;
+  }
+
+  private void balancer() {
+    this.iterations = config.getIterations();
+    if (this.iterations == -1) {
+      //run balancer infinitely
+      this.iterations = Integer.MAX_VALUE;
+    }
+
+    // nextIterationIndex is the iteration that balancer should start from on
+    // leader change or restart
+    int i = nextIterationIndex;
+    for (; i < iterations && isBalancerRunning(); i++) {
+      // reset some variables and metrics for this iteration
+      resetState();
+
+      if (!isBalancerRunning()) {
+        return;
+      }
+
+      if (config.getTriggerDuEnable()) {
+        // before starting a new iteration, we trigger all the datanode
+        // to run `du`. this is an aggressive action, with which we can
+        // get more precise usage info of all datanodes before moving.
+        // this is helpful for container balancer to make more appropriate
+        // decisions. this will increase the disk io load of data nodes, so
+        // please enable it with caution.
+        nodeManager.refreshAllHealthyDnUsageInfo();
+        try {
+          long nodeReportInterval =
+              ozoneConfiguration.getTimeDuration(HDDS_NODE_REPORT_INTERVAL,
+                  HDDS_NODE_REPORT_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS);
+          // one for sending command , one for running du, and one for
+          // reporting back make it like this for now, a more suitable
+          // value. can be set in the future if needed
+          wait(3 * nodeReportInterval);

Review Comment:
   I see. I think it is advised to use it with notify. We can skip it for this implementation. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] sumitagrawl commented on pull request #3782: HDDS-7214 Continuous start & stop can have hanging threads in stopping

Posted by GitBox <gi...@apache.org>.
sumitagrawl commented on PR #3782:
URL: https://github.com/apache/ozone/pull/3782#issuecomment-1259015932

   @lokeshj1703 @siddhantsangwan @symious Please review


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] sumitagrawl commented on a diff in pull request #3782: HDDS-7214. Continuous start & stop can have hanging threads in stopping

Posted by GitBox <gi...@apache.org>.
sumitagrawl commented on code in PR #3782:
URL: https://github.com/apache/ozone/pull/3782#discussion_r990387914


##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java:
##########
@@ -892,34 +79,38 @@ private void resetState() {
    */
   @Override
   public void notifyStatusChanged() {
-    boolean shouldStop = false;
-    boolean shouldRun = false;
+    if (!scmContext.isLeader() || scmContext.isInSafeMode()) {
+      boolean shouldStop;
+      lock.lock();
+      try {
+        shouldStop = canBalancerStop();
+      } finally {
+        lock.unlock();
+      }
+      if (shouldStop) {
+        LOG.info("Stopping ContainerBalancer in this scm on status change");
+        stop();
+      }
+      return;
+    }
+
     lock.lock();

Review Comment:
   There is no change in logic for this, Just a code refactoring to solve concurrent start call.
   
   1. Earlier within first lock, its decided if stop or start. Stop is called outside lock. And also start
   
   2.  Now, 2 different check, first for stop, if its stop, check within lock, then call stop outside
       Next start check within lock and continue to perform start within lock to avoid concurrency.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] sumitagrawl commented on pull request #3782: HDDS-7214. Continuous start & stop can have hanging threads in stopping

Posted by GitBox <gi...@apache.org>.
sumitagrawl commented on PR #3782:
URL: https://github.com/apache/ozone/pull/3782#issuecomment-1280012897

   > The logic looks good overall. I have some minor comments below. Also, I noticed that there is a lot of repeated code between `TestContainerBalancer` and `TestContainerBalancerTask`. Maybe we can keep the tests in one class? Or avoid redoing the setup logic.
   
   TestContainerBalancer is refactored to avoid setting up containerBalancerTask for moveContainer simulation. This is not required and Here need to have some wait for thread running, which is achieved using duTrigger task wait for 60sec, Same is updated in Test setup configuraiton.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] lokeshj1703 commented on a diff in pull request #3782: HDDS-7214. Continuous start & stop can have hanging threads in stopping

Posted by GitBox <gi...@apache.org>.
lokeshj1703 commented on code in PR #3782:
URL: https://github.com/apache/ozone/pull/3782#discussion_r985684506


##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java:
##########
@@ -893,32 +80,36 @@ private void resetState() {
   @Override
   public void notifyStatusChanged() {
     boolean shouldStop = false;
-    boolean shouldRun = false;
     lock.lock();
     try {
       if (!scmContext.isLeader() || scmContext.isInSafeMode()) {
-        shouldStop = isBalancerRunning();
-      } else {
-        shouldRun = shouldRun();
+        shouldStop = canBalancerStop();
       }
     } finally {
       lock.unlock();
     }
-
     if (shouldStop) {
       LOG.info("Stopping ContainerBalancer in this scm on status change");
       stop();
+      return;
     }
 
-    if (shouldRun) {
-      LOG.info("Starting ContainerBalancer in this scm on status change");
-      try {
-        start();
-      } catch (IllegalContainerBalancerStateException |
-          InvalidContainerBalancerConfigurationException e) {
-        LOG.warn("Could not start ContainerBalancer on raft/safe-mode " +
-            "status change.", e);
+    lock.lock();

Review Comment:
   It is a bit inconsistent that we need to take lock for start but not for stop call above.



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java:
##########
@@ -893,32 +80,36 @@ private void resetState() {
   @Override
   public void notifyStatusChanged() {
     boolean shouldStop = false;
-    boolean shouldRun = false;
     lock.lock();
     try {
       if (!scmContext.isLeader() || scmContext.isInSafeMode()) {
-        shouldStop = isBalancerRunning();
-      } else {
-        shouldRun = shouldRun();
+        shouldStop = canBalancerStop();

Review Comment:
   Unrelated to PR: I think we can simplify the code. 
   `shouldRun() = !shouldStop()` would be a reasonable statement.
   Also shouldStop is not taking the saved configuration into account so shouldRn is a better function to use after accounting for these checks. `!scmContext.isLeader() || scmContext.isInSafeMode()`



##########
hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancer.java:
##########
@@ -212,559 +206,102 @@ public void setup() throws IOException, NodeNotFoundException,
   }
 
   @Test
-  public void testCalculationOfUtilization() {
-    Assertions.assertEquals(nodesInCluster.size(), nodeUtilizations.size());
-    for (int i = 0; i < nodesInCluster.size(); i++) {
-      Assertions.assertEquals(nodeUtilizations.get(i),
-          nodesInCluster.get(i).calculateUtilization(), 0.0001);
-    }
-
+  public void testShouldRun() throws Exception {
+    boolean doRun = containerBalancer.shouldRun();
     // should be equal to average utilization of the cluster
-    Assertions.assertEquals(averageUtilization,
-        containerBalancer.calculateAvgUtilization(nodesInCluster), 0.0001);
-  }
-
-  /**
-   * Checks whether ContainerBalancer is correctly updating the list of
-   * unBalanced nodes with varying values of Threshold.
-   */
-  @Test
-  public void
-      initializeIterationShouldUpdateUnBalancedNodesWhenThresholdChanges()
-      throws IllegalContainerBalancerStateException, IOException,
-      InvalidContainerBalancerConfigurationException, TimeoutException {
-    List<DatanodeUsageInfo> expectedUnBalancedNodes;
-    List<DatanodeUsageInfo> unBalancedNodesAccordingToBalancer;
-
-    // check for random threshold values
-    for (int i = 0; i < 50; i++) {
-      double randomThreshold = RANDOM.nextDouble() * 100;
-
-      balancerConfiguration.setThreshold(randomThreshold);
-      startBalancer(balancerConfiguration);
-
-      // waiting for balance completed.
-      // TODO: this is a temporary implementation for now
-      // modify this after balancer is fully completed
-      try {
-        Thread.sleep(100);
-      } catch (InterruptedException e) { }
-
-      expectedUnBalancedNodes =
-          determineExpectedUnBalancedNodes(randomThreshold);
-      unBalancedNodesAccordingToBalancer =
-          containerBalancer.getUnBalancedNodes();
-
-      stopBalancer();
-      Assertions.assertEquals(
-          expectedUnBalancedNodes.size(),
-          unBalancedNodesAccordingToBalancer.size());
-
-      for (int j = 0; j < expectedUnBalancedNodes.size(); j++) {
-        Assertions.assertEquals(
-            expectedUnBalancedNodes.get(j).getDatanodeDetails(),
-            unBalancedNodesAccordingToBalancer.get(j).getDatanodeDetails());
-      }
-    }
+    Assertions.assertEquals(doRun, false);
+    containerBalancer.saveConfiguration(balancerConfiguration, true, 0);
+    doRun = containerBalancer.shouldRun();
+    Assertions.assertEquals(doRun, true);
+    containerBalancer.saveConfiguration(balancerConfiguration, false, 0);
+    doRun = containerBalancer.shouldRun();
+    Assertions.assertEquals(doRun, false);
   }
 
-  /**
-   * Checks whether the list of unBalanced nodes is empty when the cluster is
-   * balanced.
-   */
   @Test
-  public void unBalancedNodesListShouldBeEmptyWhenClusterIsBalanced()
-      throws IllegalContainerBalancerStateException, IOException,
-      InvalidContainerBalancerConfigurationException, TimeoutException {
-    balancerConfiguration.setThreshold(99.99);
-    startBalancer(balancerConfiguration);
-
-    sleepWhileBalancing(100);
-
-    stopBalancer();
-    ContainerBalancerMetrics metrics = containerBalancer.getMetrics();
-    Assertions.assertEquals(0, containerBalancer.getUnBalancedNodes().size());
-    Assertions.assertEquals(0, metrics.getNumDatanodesUnbalanced());
-  }
-
-  /**
-   * ContainerBalancer should not involve more datanodes than the
-   * maxDatanodesRatioToInvolvePerIteration limit.
-   */
-  @Test
-  public void containerBalancerShouldObeyMaxDatanodesToInvolveLimit()
-      throws IllegalContainerBalancerStateException, IOException,
-      InvalidContainerBalancerConfigurationException, TimeoutException {
-    int percent = 20;
-    balancerConfiguration.setMaxDatanodesPercentageToInvolvePerIteration(
-        percent);
-    balancerConfiguration.setMaxSizeToMovePerIteration(100 * STORAGE_UNIT);
-    balancerConfiguration.setThreshold(1);
-    balancerConfiguration.setIterations(1);
-    startBalancer(balancerConfiguration);
-
-    sleepWhileBalancing(500);
-
-    int number = percent * numberOfNodes / 100;
-    ContainerBalancerMetrics metrics = containerBalancer.getMetrics();
-    Assertions.assertFalse(
-        containerBalancer.getCountDatanodesInvolvedPerIteration() > number);
-    Assertions.assertTrue(
-        metrics.getNumDatanodesInvolvedInLatestIteration() > 0);
-    Assertions.assertFalse(
-        metrics.getNumDatanodesInvolvedInLatestIteration() > number);
-    stopBalancer();
-  }
+  public void testStartBalancerStop() throws Exception {
+    Mockito.when(replicationManager.move(Mockito.any(ContainerID.class),
+            Mockito.any(DatanodeDetails.class),
+            Mockito.any(DatanodeDetails.class)))
+        .thenReturn(genCompletableFuture(1000));
 
-  @Test
-  public void containerBalancerShouldSelectOnlyClosedContainers()
-      throws IllegalContainerBalancerStateException, IOException,
-      InvalidContainerBalancerConfigurationException, TimeoutException {
-    // make all containers open, balancer should not select any of them
-    for (ContainerInfo containerInfo : cidToInfoMap.values()) {
-      containerInfo.setState(HddsProtos.LifeCycleState.OPEN);
-    }
     balancerConfiguration.setThreshold(10);
-    startBalancer(balancerConfiguration);
-    sleepWhileBalancing(500);
-    stopBalancer();
-
-    // balancer should have identified unbalanced nodes
-    Assertions.assertFalse(containerBalancer.getUnBalancedNodes().isEmpty());
-    // no container should have been selected
-    Assertions.assertTrue(containerBalancer.getContainerToSourceMap()
-        .isEmpty());
-    /*
-    Iteration result should be CAN_NOT_BALANCE_ANY_MORE because no container
-    move is generated
-     */
-    Assertions.assertEquals(
-        ContainerBalancer.IterationResult.CAN_NOT_BALANCE_ANY_MORE,
-        containerBalancer.getIterationResult());
-
-    // now, close all containers
-    for (ContainerInfo containerInfo : cidToInfoMap.values()) {
-      containerInfo.setState(HddsProtos.LifeCycleState.CLOSED);
-    }
-    startBalancer(balancerConfiguration);
-    sleepWhileBalancing(500);
-    stopBalancer();
-
-    // check whether all selected containers are closed
-    for (ContainerID cid:
-         containerBalancer.getContainerToSourceMap().keySet()) {
-      Assertions.assertSame(
-          cidToInfoMap.get(cid).getState(), HddsProtos.LifeCycleState.CLOSED);
-    }
-  }
-
-  @Test
-  public void containerBalancerShouldObeyMaxSizeToMoveLimit()
-      throws IllegalContainerBalancerStateException, IOException,
-      InvalidContainerBalancerConfigurationException, TimeoutException {
-    balancerConfiguration.setThreshold(1);
-    balancerConfiguration.setMaxSizeToMovePerIteration(10 * STORAGE_UNIT);
     balancerConfiguration.setIterations(1);
-    startBalancer(balancerConfiguration);
-
-    sleepWhileBalancing(500);
-
-    // balancer should not have moved more size than the limit
-    Assertions.assertFalse(
-        containerBalancer.getSizeScheduledForMoveInLatestIteration() >
-        10 * STORAGE_UNIT);
-
-    long size = containerBalancer.getMetrics()
-        .getDataSizeMovedGBInLatestIteration();
-    Assertions.assertTrue(size > 0);
-    Assertions.assertFalse(size > 10);
-    stopBalancer();
-  }
-
-  @Test
-  public void targetDatanodeShouldNotAlreadyContainSelectedContainer()
-      throws IllegalContainerBalancerStateException, IOException,
-      InvalidContainerBalancerConfigurationException, TimeoutException {
-    balancerConfiguration.setThreshold(10);
+    balancerConfiguration.setMaxSizeEnteringTarget(10 * STORAGE_UNIT);
     balancerConfiguration.setMaxSizeToMovePerIteration(100 * STORAGE_UNIT);
     balancerConfiguration.setMaxDatanodesPercentageToInvolvePerIteration(100);
-    startBalancer(balancerConfiguration);
-
-    sleepWhileBalancing(1000);
-
-    stopBalancer();
-    Map<ContainerID, DatanodeDetails> map =
-        containerBalancer.getContainerToTargetMap();
-    for (Map.Entry<ContainerID, DatanodeDetails> entry : map.entrySet()) {
-      ContainerID container = entry.getKey();
-      DatanodeDetails target = entry.getValue();
-      Assertions.assertTrue(cidToReplicasMap.get(container)
-          .stream()
-          .map(ContainerReplica::getDatanodeDetails)
-          .noneMatch(target::equals));
-    }
-  }
+    balancerConfiguration.setMoveTimeout(Duration.ofMillis(1000));
 
-  @Test
-  public void containerMoveSelectionShouldFollowPlacementPolicy()
-      throws IllegalContainerBalancerStateException, IOException,
-      InvalidContainerBalancerConfigurationException, TimeoutException {
-    balancerConfiguration.setThreshold(10);
-    balancerConfiguration.setMaxSizeToMovePerIteration(50 * STORAGE_UNIT);
-    balancerConfiguration.setMaxDatanodesPercentageToInvolvePerIteration(100);
-    balancerConfiguration.setIterations(1);
     startBalancer(balancerConfiguration);
-
-    sleepWhileBalancing(500);
-
-    stopBalancer();
-    Map<ContainerID, DatanodeDetails> containerFromSourceMap =
-        containerBalancer.getContainerToSourceMap();
-    Map<ContainerID, DatanodeDetails> containerToTargetMap =
-        containerBalancer.getContainerToTargetMap();
-
-    // for each move selection, check if {replicas - source + target}
-    // satisfies placement policy
-    for (Map.Entry<ContainerID, DatanodeDetails> entry :
-        containerFromSourceMap.entrySet()) {
-      ContainerID container = entry.getKey();
-      DatanodeDetails source = entry.getValue();
-
-      List<DatanodeDetails> replicas = cidToReplicasMap.get(container)
-          .stream()
-          .map(ContainerReplica::getDatanodeDetails)
-          .collect(Collectors.toList());
-      // remove source and add target
-      replicas.remove(source);
-      replicas.add(containerToTargetMap.get(container));
-
-      ContainerInfo containerInfo = cidToInfoMap.get(container);
-      ContainerPlacementStatus placementStatus =
-          placementPolicy.validateContainerPlacement(replicas,
-              containerInfo.getReplicationConfig().getRequiredNodes());
-      Assertions.assertTrue(placementStatus.isPolicySatisfied());
-    }
-  }
-
-  @Test
-  public void targetDatanodeShouldBeInServiceHealthy()
-      throws NodeNotFoundException, IllegalContainerBalancerStateException,
-      IOException, InvalidContainerBalancerConfigurationException,
-      TimeoutException {
-    balancerConfiguration.setThreshold(10);
-    balancerConfiguration.setMaxDatanodesPercentageToInvolvePerIteration(100);
-    balancerConfiguration.setMaxSizeToMovePerIteration(50 * STORAGE_UNIT);
-    balancerConfiguration.setMaxSizeEnteringTarget(50 * STORAGE_UNIT);
-    balancerConfiguration.setIterations(1);
-    startBalancer(balancerConfiguration);
-
-    sleepWhileBalancing(500);
-
-    stopBalancer();
-    for (DatanodeDetails target : containerBalancer.getSelectedTargets()) {
-      NodeStatus status = mockNodeManager.getNodeStatus(target);
-      Assertions.assertSame(HddsProtos.NodeOperationalState.IN_SERVICE,
-          status.getOperationalState());
-      Assertions.assertTrue(status.isHealthy());
+    try {
+      containerBalancer.startBalancer(balancerConfiguration);
+    } catch (IllegalContainerBalancerStateException e) {
+      // start failed
+      Assertions.assertTrue(true);

Review Comment:
   Do we want to assert another variable here?



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java:
##########
@@ -1137,45 +351,21 @@ public void stopBalancer()
       TimeoutException {
     lock.lock();
     try {
-      // should be leader, out of safe mode, and currently running
       validateState(true);
-      saveConfiguration(config, false, 0);

Review Comment:
   saveConfiguration could end up happening outside lock if executed in the ContainerBalancerTask.



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java:
##########
@@ -1137,45 +351,21 @@ public void stopBalancer()
       TimeoutException {
     lock.lock();
     try {
-      // should be leader, out of safe mode, and currently running
       validateState(true);
-      saveConfiguration(config, false, 0);

Review Comment:
   We would need this change?



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerTask.java:
##########
@@ -0,0 +1,1009 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container.balancer;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.conf.StorageUnit;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.PlacementPolicyValidateProxy;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
+import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
+import org.apache.hadoop.hdds.scm.container.replication.LegacyReplicationManager;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.scm.net.NetworkTopology;
+import org.apache.hadoop.hdds.scm.node.DatanodeUsageInfo;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_NODE_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_NODE_REPORT_INTERVAL_DEFAULT;
+
+/**
+ * Container balancer is a service in SCM to move containers between over- and
+ * under-utilized datanodes.
+ */
+public class ContainerBalancerTask implements Runnable {
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(ContainerBalancerTask.class);
+
+  private NodeManager nodeManager;
+  private ContainerManager containerManager;
+  private ReplicationManager replicationManager;
+  private OzoneConfiguration ozoneConfiguration;
+  private ContainerBalancer containerBalancer;
+  private final SCMContext scmContext;
+  private double threshold;
+  private int totalNodesInCluster;
+  private double maxDatanodesRatioToInvolvePerIteration;
+  private long maxSizeToMovePerIteration;
+  private int countDatanodesInvolvedPerIteration;
+  private long sizeScheduledForMoveInLatestIteration;
+  // count actual size moved in bytes
+  private long sizeActuallyMovedInLatestIteration;
+  private int iterations;
+  private List<DatanodeUsageInfo> unBalancedNodes;
+  private List<DatanodeUsageInfo> overUtilizedNodes;
+  private List<DatanodeUsageInfo> underUtilizedNodes;
+  private List<DatanodeUsageInfo> withinThresholdUtilizedNodes;
+  private Set<String> excludeNodes;
+  private Set<String> includeNodes;
+  private ContainerBalancerConfiguration config;
+  private ContainerBalancerMetrics metrics;
+  private long clusterCapacity;
+  private long clusterRemaining;
+  private double clusterAvgUtilisation;
+  private PlacementPolicyValidateProxy placementPolicyValidateProxy;
+  private NetworkTopology networkTopology;
+  private double upperLimit;
+  private double lowerLimit;
+  private ContainerBalancerSelectionCriteria selectionCriteria;
+  private volatile Status taskStatus = Status.RUNNING;
+
+  /*
+  Since a container can be selected only once during an iteration, these maps
+   use it as a primary key to track source to target pairings.
+  */
+  private final Map<ContainerID, DatanodeDetails> containerToSourceMap;
+  private final Map<ContainerID, DatanodeDetails> containerToTargetMap;
+
+  private Set<DatanodeDetails> selectedTargets;
+  private Set<DatanodeDetails> selectedSources;
+  private FindTargetStrategy findTargetStrategy;
+  private FindSourceStrategy findSourceStrategy;
+  private Map<ContainerMoveSelection,
+      CompletableFuture<LegacyReplicationManager.MoveResult>>
+      moveSelectionToFutureMap;
+  private IterationResult iterationResult;
+  private int nextIterationIndex;
+
+  /**
+   * Constructs ContainerBalancer with the specified arguments. Initializes
+   * ContainerBalancerMetrics. Container Balancer does not start on
+   * construction.
+   *
+   * @param scm the storage container manager
+   */
+  public ContainerBalancerTask(StorageContainerManager scm,
+                               int nextIterationIndex,
+                               ContainerBalancer containerBalancer,
+                               ContainerBalancerMetrics metrics,
+                               ContainerBalancerConfiguration config) {
+    this.nodeManager = scm.getScmNodeManager();
+    this.containerManager = scm.getContainerManager();
+    this.replicationManager = scm.getReplicationManager();
+    this.ozoneConfiguration = scm.getConfiguration();
+    this.containerBalancer = containerBalancer;
+    this.config = config;
+    this.metrics = metrics;
+    this.scmContext = scm.getScmContext();
+    this.overUtilizedNodes = new ArrayList<>();
+    this.underUtilizedNodes = new ArrayList<>();
+    this.withinThresholdUtilizedNodes = new ArrayList<>();
+    this.unBalancedNodes = new ArrayList<>();
+    this.placementPolicyValidateProxy = scm.getPlacementPolicyValidateProxy();
+    this.networkTopology = scm.getClusterMap();
+    this.nextIterationIndex = nextIterationIndex;
+    this.containerToSourceMap = new HashMap<>();
+    this.containerToTargetMap = new HashMap<>();
+    this.selectedSources = new HashSet<>();
+    this.selectedTargets = new HashSet<>();
+    findSourceStrategy = new FindSourceGreedy(nodeManager);
+  }
+
+  /**
+   * Balances the cluster in iterations. Regularly checks if balancing has
+   * been stopped.
+   */
+  public void run() {
+    try {
+      balancer();
+
+      // mark balancer completion, if ha and stopped as not a leader, it will
+      // be handled inside for not saving configuration with leader check
+      saveConfiguration(config, false, 0);
+    } catch (Throwable e) {
+      LOG.error("Container Balancer is stopped abnormally, ", e);
+    }
+    taskStatus = Status.STOPPED;
+  }
+
+  /**
+   * Tries to stop ContainerBalancer changing status to stopping. Calls
+   * {@link ContainerBalancerTask#stop()}.
+   */
+  public void stop() {
+    taskStatus = Status.STOPPING;
+  }
+
+  private void balancer() {
+    this.iterations = config.getIterations();
+    if (this.iterations == -1) {
+      //run balancer infinitely
+      this.iterations = Integer.MAX_VALUE;
+    }
+
+    // nextIterationIndex is the iteration that balancer should start from on
+    // leader change or restart
+    int i = nextIterationIndex;
+    for (; i < iterations && isBalancerRunning(); i++) {
+      // reset some variables and metrics for this iteration
+      resetState();
+
+      if (!isBalancerRunning()) {
+        return;
+      }
+
+      if (config.getTriggerDuEnable()) {
+        // before starting a new iteration, we trigger all the datanode
+        // to run `du`. this is an aggressive action, with which we can
+        // get more precise usage info of all datanodes before moving.
+        // this is helpful for container balancer to make more appropriate
+        // decisions. this will increase the disk io load of data nodes, so
+        // please enable it with caution.
+        nodeManager.refreshAllHealthyDnUsageInfo();
+        try {
+          long nodeReportInterval =
+              ozoneConfiguration.getTimeDuration(HDDS_NODE_REPORT_INTERVAL,
+                  HDDS_NODE_REPORT_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS);
+          // one for sending command , one for running du, and one for
+          // reporting back make it like this for now, a more suitable
+          // value. can be set in the future if needed
+          wait(3 * nodeReportInterval);

Review Comment:
   I think it is a requirement to have wait inside synchronized block.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] sumitagrawl commented on a diff in pull request #3782: HDDS-7214. Continuous start & stop can have hanging threads in stopping

Posted by GitBox <gi...@apache.org>.
sumitagrawl commented on code in PR #3782:
URL: https://github.com/apache/ozone/pull/3782#discussion_r990446415


##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java:
##########
@@ -1092,33 +315,24 @@ public void stop() {
     Thread balancingThread;
     lock.lock();
     try {
-      if (!isBalancerRunning()) {
-        LOG.warn("Cannot stop Container Balancer because it's not running");
+      if (!canBalancerStop()) {
+        LOG.warn("Cannot stop Container Balancer because it's not running or " +
+            "stopping");
         return;
       }
+      task.stop();
       balancingThread = currentBalancingThread;

Review Comment:
   Fixed one issue related to state sync when set to stopped and stoping. Need a temp variable as state can change outside the lock.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] sumitagrawl commented on a diff in pull request #3782: HDDS-7214. Continuous start & stop can have hanging threads in stopping

Posted by GitBox <gi...@apache.org>.
sumitagrawl commented on code in PR #3782:
URL: https://github.com/apache/ozone/pull/3782#discussion_r991029575


##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java:
##########
@@ -1137,45 +351,21 @@ public void stopBalancer()
       TimeoutException {
     lock.lock();
     try {
-      // should be leader, out of safe mode, and currently running
       validateState(true);
-      saveConfiguration(config, false, 0);

Review Comment:
   For HA, SCM stop should not saveConfiguration to false, and CLI based stop, it should set to false for normal flow.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] siddhantsangwan commented on a diff in pull request #3782: HDDS-7214. Continuous start & stop can have hanging threads in stopping

Posted by GitBox <gi...@apache.org>.
siddhantsangwan commented on code in PR #3782:
URL: https://github.com/apache/ozone/pull/3782#discussion_r994670075


##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerTask.java:
##########
@@ -0,0 +1,1032 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container.balancer;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.conf.StorageUnit;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.PlacementPolicyValidateProxy;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
+import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
+import org.apache.hadoop.hdds.scm.container.replication.LegacyReplicationManager;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.scm.net.NetworkTopology;
+import org.apache.hadoop.hdds.scm.node.DatanodeUsageInfo;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_NODE_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_NODE_REPORT_INTERVAL_DEFAULT;
+
+/**
+ * Container balancer is a service in SCM to move containers between over- and
+ * under-utilized datanodes.
+ */
+public class ContainerBalancerTask implements Runnable {
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(ContainerBalancerTask.class);
+
+  private NodeManager nodeManager;
+  private ContainerManager containerManager;
+  private ReplicationManager replicationManager;
+  private OzoneConfiguration ozoneConfiguration;
+  private ContainerBalancer containerBalancer;
+  private final SCMContext scmContext;
+  private double threshold;
+  private int totalNodesInCluster;
+  private double maxDatanodesRatioToInvolvePerIteration;
+  private long maxSizeToMovePerIteration;
+  private int countDatanodesInvolvedPerIteration;
+  private long sizeScheduledForMoveInLatestIteration;
+  // count actual size moved in bytes
+  private long sizeActuallyMovedInLatestIteration;
+  private int iterations;
+  private List<DatanodeUsageInfo> unBalancedNodes;
+  private List<DatanodeUsageInfo> overUtilizedNodes;
+  private List<DatanodeUsageInfo> underUtilizedNodes;
+  private List<DatanodeUsageInfo> withinThresholdUtilizedNodes;
+  private Set<String> excludeNodes;
+  private Set<String> includeNodes;
+  private ContainerBalancerConfiguration config;
+  private ContainerBalancerMetrics metrics;
+  private long clusterCapacity;
+  private long clusterRemaining;
+  private double clusterAvgUtilisation;
+  private PlacementPolicyValidateProxy placementPolicyValidateProxy;
+  private NetworkTopology networkTopology;
+  private double upperLimit;
+  private double lowerLimit;
+  private ContainerBalancerSelectionCriteria selectionCriteria;
+  private volatile Status taskStatus = Status.RUNNING;
+
+  /*
+  Since a container can be selected only once during an iteration, these maps
+   use it as a primary key to track source to target pairings.
+  */
+  private final Map<ContainerID, DatanodeDetails> containerToSourceMap;
+  private final Map<ContainerID, DatanodeDetails> containerToTargetMap;
+
+  private Set<DatanodeDetails> selectedTargets;
+  private Set<DatanodeDetails> selectedSources;
+  private FindTargetStrategy findTargetStrategy;
+  private FindSourceStrategy findSourceStrategy;
+  private Map<ContainerMoveSelection,
+      CompletableFuture<LegacyReplicationManager.MoveResult>>
+      moveSelectionToFutureMap;
+  private IterationResult iterationResult;
+  private int nextIterationIndex;
+
+  /**
+   * Constructs ContainerBalancer with the specified arguments. Initializes
+   * ContainerBalancerMetrics. Container Balancer does not start on
+   * construction.
+   *
+   * @param scm the storage container manager
+   */
+  public ContainerBalancerTask(StorageContainerManager scm,
+                               int nextIterationIndex,
+                               ContainerBalancer containerBalancer,
+                               ContainerBalancerMetrics metrics,
+                               ContainerBalancerConfiguration config) {
+    this.nodeManager = scm.getScmNodeManager();
+    this.containerManager = scm.getContainerManager();
+    this.replicationManager = scm.getReplicationManager();
+    this.ozoneConfiguration = scm.getConfiguration();
+    this.containerBalancer = containerBalancer;
+    this.config = config;
+    this.metrics = metrics;
+    this.scmContext = scm.getScmContext();
+    this.overUtilizedNodes = new ArrayList<>();
+    this.underUtilizedNodes = new ArrayList<>();
+    this.withinThresholdUtilizedNodes = new ArrayList<>();
+    this.unBalancedNodes = new ArrayList<>();
+    this.placementPolicyValidateProxy = scm.getPlacementPolicyValidateProxy();
+    this.networkTopology = scm.getClusterMap();
+    this.nextIterationIndex = nextIterationIndex;
+    this.containerToSourceMap = new HashMap<>();
+    this.containerToTargetMap = new HashMap<>();
+    this.selectedSources = new HashSet<>();
+    this.selectedTargets = new HashSet<>();
+    findSourceStrategy = new FindSourceGreedy(nodeManager);
+  }
+
+  /**
+   * Balances the cluster in iterations. Regularly checks if balancing has
+   * been stopped.
+   */
+  public void run() {
+    try {
+      balancer();
+    } catch (Throwable e) {
+      LOG.error("Container Balancer is stopped abnormally, ", e);
+    }

Review Comment:
   Why are we catching a Throwable here?



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerTask.java:
##########
@@ -0,0 +1,1032 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container.balancer;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.conf.StorageUnit;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.PlacementPolicyValidateProxy;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
+import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
+import org.apache.hadoop.hdds.scm.container.replication.LegacyReplicationManager;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.scm.net.NetworkTopology;
+import org.apache.hadoop.hdds.scm.node.DatanodeUsageInfo;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_NODE_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_NODE_REPORT_INTERVAL_DEFAULT;
+
+/**
+ * Container balancer is a service in SCM to move containers between over- and
+ * under-utilized datanodes.
+ */
+public class ContainerBalancerTask implements Runnable {
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(ContainerBalancerTask.class);
+
+  private NodeManager nodeManager;
+  private ContainerManager containerManager;
+  private ReplicationManager replicationManager;
+  private OzoneConfiguration ozoneConfiguration;
+  private ContainerBalancer containerBalancer;
+  private final SCMContext scmContext;
+  private double threshold;
+  private int totalNodesInCluster;
+  private double maxDatanodesRatioToInvolvePerIteration;
+  private long maxSizeToMovePerIteration;
+  private int countDatanodesInvolvedPerIteration;
+  private long sizeScheduledForMoveInLatestIteration;
+  // count actual size moved in bytes
+  private long sizeActuallyMovedInLatestIteration;
+  private int iterations;
+  private List<DatanodeUsageInfo> unBalancedNodes;
+  private List<DatanodeUsageInfo> overUtilizedNodes;
+  private List<DatanodeUsageInfo> underUtilizedNodes;
+  private List<DatanodeUsageInfo> withinThresholdUtilizedNodes;
+  private Set<String> excludeNodes;
+  private Set<String> includeNodes;
+  private ContainerBalancerConfiguration config;
+  private ContainerBalancerMetrics metrics;
+  private long clusterCapacity;
+  private long clusterRemaining;
+  private double clusterAvgUtilisation;
+  private PlacementPolicyValidateProxy placementPolicyValidateProxy;
+  private NetworkTopology networkTopology;
+  private double upperLimit;
+  private double lowerLimit;
+  private ContainerBalancerSelectionCriteria selectionCriteria;
+  private volatile Status taskStatus = Status.RUNNING;
+
+  /*
+  Since a container can be selected only once during an iteration, these maps
+   use it as a primary key to track source to target pairings.
+  */
+  private final Map<ContainerID, DatanodeDetails> containerToSourceMap;
+  private final Map<ContainerID, DatanodeDetails> containerToTargetMap;
+
+  private Set<DatanodeDetails> selectedTargets;
+  private Set<DatanodeDetails> selectedSources;
+  private FindTargetStrategy findTargetStrategy;
+  private FindSourceStrategy findSourceStrategy;
+  private Map<ContainerMoveSelection,
+      CompletableFuture<LegacyReplicationManager.MoveResult>>
+      moveSelectionToFutureMap;
+  private IterationResult iterationResult;
+  private int nextIterationIndex;
+
+  /**
+   * Constructs ContainerBalancer with the specified arguments. Initializes
+   * ContainerBalancerMetrics. Container Balancer does not start on
+   * construction.
+   *
+   * @param scm the storage container manager
+   */
+  public ContainerBalancerTask(StorageContainerManager scm,
+                               int nextIterationIndex,
+                               ContainerBalancer containerBalancer,
+                               ContainerBalancerMetrics metrics,
+                               ContainerBalancerConfiguration config) {
+    this.nodeManager = scm.getScmNodeManager();
+    this.containerManager = scm.getContainerManager();
+    this.replicationManager = scm.getReplicationManager();
+    this.ozoneConfiguration = scm.getConfiguration();
+    this.containerBalancer = containerBalancer;
+    this.config = config;
+    this.metrics = metrics;
+    this.scmContext = scm.getScmContext();
+    this.overUtilizedNodes = new ArrayList<>();
+    this.underUtilizedNodes = new ArrayList<>();
+    this.withinThresholdUtilizedNodes = new ArrayList<>();
+    this.unBalancedNodes = new ArrayList<>();
+    this.placementPolicyValidateProxy = scm.getPlacementPolicyValidateProxy();
+    this.networkTopology = scm.getClusterMap();
+    this.nextIterationIndex = nextIterationIndex;
+    this.containerToSourceMap = new HashMap<>();
+    this.containerToTargetMap = new HashMap<>();
+    this.selectedSources = new HashSet<>();
+    this.selectedTargets = new HashSet<>();
+    findSourceStrategy = new FindSourceGreedy(nodeManager);
+  }
+
+  /**
+   * Balances the cluster in iterations. Regularly checks if balancing has
+   * been stopped.
+   */

Review Comment:
   This doc seems wrong. I think it better describes the `balancer` method.



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerTask.java:
##########
@@ -0,0 +1,1032 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container.balancer;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.conf.StorageUnit;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.PlacementPolicyValidateProxy;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
+import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
+import org.apache.hadoop.hdds.scm.container.replication.LegacyReplicationManager;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.scm.net.NetworkTopology;
+import org.apache.hadoop.hdds.scm.node.DatanodeUsageInfo;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_NODE_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_NODE_REPORT_INTERVAL_DEFAULT;
+
+/**
+ * Container balancer is a service in SCM to move containers between over- and
+ * under-utilized datanodes.
+ */
+public class ContainerBalancerTask implements Runnable {
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(ContainerBalancerTask.class);
+
+  private NodeManager nodeManager;
+  private ContainerManager containerManager;
+  private ReplicationManager replicationManager;
+  private OzoneConfiguration ozoneConfiguration;
+  private ContainerBalancer containerBalancer;
+  private final SCMContext scmContext;
+  private double threshold;
+  private int totalNodesInCluster;
+  private double maxDatanodesRatioToInvolvePerIteration;
+  private long maxSizeToMovePerIteration;
+  private int countDatanodesInvolvedPerIteration;
+  private long sizeScheduledForMoveInLatestIteration;
+  // count actual size moved in bytes
+  private long sizeActuallyMovedInLatestIteration;
+  private int iterations;
+  private List<DatanodeUsageInfo> unBalancedNodes;
+  private List<DatanodeUsageInfo> overUtilizedNodes;
+  private List<DatanodeUsageInfo> underUtilizedNodes;
+  private List<DatanodeUsageInfo> withinThresholdUtilizedNodes;
+  private Set<String> excludeNodes;
+  private Set<String> includeNodes;
+  private ContainerBalancerConfiguration config;
+  private ContainerBalancerMetrics metrics;
+  private long clusterCapacity;
+  private long clusterRemaining;
+  private double clusterAvgUtilisation;
+  private PlacementPolicyValidateProxy placementPolicyValidateProxy;
+  private NetworkTopology networkTopology;
+  private double upperLimit;
+  private double lowerLimit;
+  private ContainerBalancerSelectionCriteria selectionCriteria;
+  private volatile Status taskStatus = Status.RUNNING;
+
+  /*
+  Since a container can be selected only once during an iteration, these maps
+   use it as a primary key to track source to target pairings.
+  */
+  private final Map<ContainerID, DatanodeDetails> containerToSourceMap;
+  private final Map<ContainerID, DatanodeDetails> containerToTargetMap;
+
+  private Set<DatanodeDetails> selectedTargets;
+  private Set<DatanodeDetails> selectedSources;
+  private FindTargetStrategy findTargetStrategy;
+  private FindSourceStrategy findSourceStrategy;
+  private Map<ContainerMoveSelection,
+      CompletableFuture<LegacyReplicationManager.MoveResult>>
+      moveSelectionToFutureMap;
+  private IterationResult iterationResult;
+  private int nextIterationIndex;
+
+  /**
+   * Constructs ContainerBalancer with the specified arguments. Initializes
+   * ContainerBalancerMetrics. Container Balancer does not start on
+   * construction.
+   *
+   * @param scm the storage container manager
+   */
+  public ContainerBalancerTask(StorageContainerManager scm,
+                               int nextIterationIndex,
+                               ContainerBalancer containerBalancer,
+                               ContainerBalancerMetrics metrics,
+                               ContainerBalancerConfiguration config) {
+    this.nodeManager = scm.getScmNodeManager();
+    this.containerManager = scm.getContainerManager();
+    this.replicationManager = scm.getReplicationManager();
+    this.ozoneConfiguration = scm.getConfiguration();
+    this.containerBalancer = containerBalancer;
+    this.config = config;
+    this.metrics = metrics;
+    this.scmContext = scm.getScmContext();
+    this.overUtilizedNodes = new ArrayList<>();
+    this.underUtilizedNodes = new ArrayList<>();
+    this.withinThresholdUtilizedNodes = new ArrayList<>();
+    this.unBalancedNodes = new ArrayList<>();
+    this.placementPolicyValidateProxy = scm.getPlacementPolicyValidateProxy();
+    this.networkTopology = scm.getClusterMap();
+    this.nextIterationIndex = nextIterationIndex;
+    this.containerToSourceMap = new HashMap<>();
+    this.containerToTargetMap = new HashMap<>();
+    this.selectedSources = new HashSet<>();
+    this.selectedTargets = new HashSet<>();
+    findSourceStrategy = new FindSourceGreedy(nodeManager);
+  }
+
+  /**
+   * Balances the cluster in iterations. Regularly checks if balancing has
+   * been stopped.
+   */
+  public void run() {
+    try {
+      balancer();
+    } catch (Throwable e) {
+      LOG.error("Container Balancer is stopped abnormally, ", e);
+    }
+    synchronized (this) {
+      taskStatus = Status.STOPPED;
+    }
+  }
+
+  /**
+   * Tries to stop ContainerBalancer changing status to stopping. Calls
+   * {@link ContainerBalancerTask#stop()}.
+   */
+  public void stop() {
+    synchronized (this) {
+      if (taskStatus == Status.RUNNING) {
+        taskStatus = Status.STOPPING;
+      }
+    }
+  }
+
+  private void balancer() {

Review Comment:
   Let's change this from `balancer()` to `balance()`.



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerTask.java:
##########
@@ -0,0 +1,1032 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container.balancer;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.conf.StorageUnit;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.PlacementPolicyValidateProxy;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
+import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
+import org.apache.hadoop.hdds.scm.container.replication.LegacyReplicationManager;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.scm.net.NetworkTopology;
+import org.apache.hadoop.hdds.scm.node.DatanodeUsageInfo;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_NODE_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_NODE_REPORT_INTERVAL_DEFAULT;
+
+/**
+ * Container balancer is a service in SCM to move containers between over- and
+ * under-utilized datanodes.
+ */
+public class ContainerBalancerTask implements Runnable {
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(ContainerBalancerTask.class);
+
+  private NodeManager nodeManager;
+  private ContainerManager containerManager;
+  private ReplicationManager replicationManager;
+  private OzoneConfiguration ozoneConfiguration;
+  private ContainerBalancer containerBalancer;
+  private final SCMContext scmContext;
+  private double threshold;
+  private int totalNodesInCluster;
+  private double maxDatanodesRatioToInvolvePerIteration;
+  private long maxSizeToMovePerIteration;
+  private int countDatanodesInvolvedPerIteration;
+  private long sizeScheduledForMoveInLatestIteration;
+  // count actual size moved in bytes
+  private long sizeActuallyMovedInLatestIteration;
+  private int iterations;
+  private List<DatanodeUsageInfo> unBalancedNodes;
+  private List<DatanodeUsageInfo> overUtilizedNodes;
+  private List<DatanodeUsageInfo> underUtilizedNodes;
+  private List<DatanodeUsageInfo> withinThresholdUtilizedNodes;
+  private Set<String> excludeNodes;
+  private Set<String> includeNodes;
+  private ContainerBalancerConfiguration config;
+  private ContainerBalancerMetrics metrics;
+  private long clusterCapacity;
+  private long clusterRemaining;
+  private double clusterAvgUtilisation;
+  private PlacementPolicyValidateProxy placementPolicyValidateProxy;
+  private NetworkTopology networkTopology;
+  private double upperLimit;
+  private double lowerLimit;
+  private ContainerBalancerSelectionCriteria selectionCriteria;
+  private volatile Status taskStatus = Status.RUNNING;
+
+  /*
+  Since a container can be selected only once during an iteration, these maps
+   use it as a primary key to track source to target pairings.
+  */
+  private final Map<ContainerID, DatanodeDetails> containerToSourceMap;
+  private final Map<ContainerID, DatanodeDetails> containerToTargetMap;
+
+  private Set<DatanodeDetails> selectedTargets;
+  private Set<DatanodeDetails> selectedSources;
+  private FindTargetStrategy findTargetStrategy;
+  private FindSourceStrategy findSourceStrategy;
+  private Map<ContainerMoveSelection,
+      CompletableFuture<LegacyReplicationManager.MoveResult>>
+      moveSelectionToFutureMap;
+  private IterationResult iterationResult;
+  private int nextIterationIndex;
+
+  /**
+   * Constructs ContainerBalancer with the specified arguments. Initializes
+   * ContainerBalancerMetrics. Container Balancer does not start on
+   * construction.
+   *
+   * @param scm the storage container manager
+   */
+  public ContainerBalancerTask(StorageContainerManager scm,
+                               int nextIterationIndex,
+                               ContainerBalancer containerBalancer,
+                               ContainerBalancerMetrics metrics,
+                               ContainerBalancerConfiguration config) {
+    this.nodeManager = scm.getScmNodeManager();
+    this.containerManager = scm.getContainerManager();
+    this.replicationManager = scm.getReplicationManager();
+    this.ozoneConfiguration = scm.getConfiguration();
+    this.containerBalancer = containerBalancer;
+    this.config = config;
+    this.metrics = metrics;
+    this.scmContext = scm.getScmContext();
+    this.overUtilizedNodes = new ArrayList<>();
+    this.underUtilizedNodes = new ArrayList<>();
+    this.withinThresholdUtilizedNodes = new ArrayList<>();
+    this.unBalancedNodes = new ArrayList<>();
+    this.placementPolicyValidateProxy = scm.getPlacementPolicyValidateProxy();
+    this.networkTopology = scm.getClusterMap();
+    this.nextIterationIndex = nextIterationIndex;
+    this.containerToSourceMap = new HashMap<>();
+    this.containerToTargetMap = new HashMap<>();
+    this.selectedSources = new HashSet<>();
+    this.selectedTargets = new HashSet<>();
+    findSourceStrategy = new FindSourceGreedy(nodeManager);
+  }
+
+  /**
+   * Balances the cluster in iterations. Regularly checks if balancing has
+   * been stopped.
+   */
+  public void run() {
+    try {
+      balancer();
+    } catch (Throwable e) {
+      LOG.error("Container Balancer is stopped abnormally, ", e);
+    }
+    synchronized (this) {
+      taskStatus = Status.STOPPED;
+    }
+  }
+
+  /**
+   * Tries to stop ContainerBalancer changing status to stopping. Calls
+   * {@link ContainerBalancerTask#stop()}.

Review Comment:
   ```suggestion
      * Changes Status from RUNNING to STOPPING.
   ```



##########
hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancer.java:
##########
@@ -212,559 +206,106 @@ public void setup() throws IOException, NodeNotFoundException,
   }
 
   @Test
-  public void testCalculationOfUtilization() {
-    Assertions.assertEquals(nodesInCluster.size(), nodeUtilizations.size());
-    for (int i = 0; i < nodesInCluster.size(); i++) {
-      Assertions.assertEquals(nodeUtilizations.get(i),
-          nodesInCluster.get(i).calculateUtilization(), 0.0001);
-    }
-
+  public void testShouldRun() throws Exception {
+    boolean doRun = containerBalancer.shouldRun();
     // should be equal to average utilization of the cluster
-    Assertions.assertEquals(averageUtilization,
-        containerBalancer.calculateAvgUtilization(nodesInCluster), 0.0001);
-  }
-
-  /**
-   * Checks whether ContainerBalancer is correctly updating the list of
-   * unBalanced nodes with varying values of Threshold.
-   */
-  @Test
-  public void
-      initializeIterationShouldUpdateUnBalancedNodesWhenThresholdChanges()
-      throws IllegalContainerBalancerStateException, IOException,
-      InvalidContainerBalancerConfigurationException, TimeoutException {
-    List<DatanodeUsageInfo> expectedUnBalancedNodes;
-    List<DatanodeUsageInfo> unBalancedNodesAccordingToBalancer;
-
-    // check for random threshold values
-    for (int i = 0; i < 50; i++) {
-      double randomThreshold = RANDOM.nextDouble() * 100;
-
-      balancerConfiguration.setThreshold(randomThreshold);
-      startBalancer(balancerConfiguration);
-
-      // waiting for balance completed.
-      // TODO: this is a temporary implementation for now
-      // modify this after balancer is fully completed
-      try {
-        Thread.sleep(100);
-      } catch (InterruptedException e) { }
-
-      expectedUnBalancedNodes =
-          determineExpectedUnBalancedNodes(randomThreshold);
-      unBalancedNodesAccordingToBalancer =
-          containerBalancer.getUnBalancedNodes();
-
-      stopBalancer();
-      Assertions.assertEquals(
-          expectedUnBalancedNodes.size(),
-          unBalancedNodesAccordingToBalancer.size());
-
-      for (int j = 0; j < expectedUnBalancedNodes.size(); j++) {
-        Assertions.assertEquals(
-            expectedUnBalancedNodes.get(j).getDatanodeDetails(),
-            unBalancedNodesAccordingToBalancer.get(j).getDatanodeDetails());
-      }
-    }
+    Assertions.assertEquals(doRun, false);
+    containerBalancer.saveConfiguration(balancerConfiguration, true, 0);
+    doRun = containerBalancer.shouldRun();
+    Assertions.assertEquals(doRun, true);

Review Comment:
   ```suggestion
       Assertions.assertTrue(doRun);
   ```



##########
hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancer.java:
##########
@@ -212,559 +206,106 @@ public void setup() throws IOException, NodeNotFoundException,
   }
 
   @Test
-  public void testCalculationOfUtilization() {
-    Assertions.assertEquals(nodesInCluster.size(), nodeUtilizations.size());
-    for (int i = 0; i < nodesInCluster.size(); i++) {
-      Assertions.assertEquals(nodeUtilizations.get(i),
-          nodesInCluster.get(i).calculateUtilization(), 0.0001);
-    }
-
+  public void testShouldRun() throws Exception {
+    boolean doRun = containerBalancer.shouldRun();
     // should be equal to average utilization of the cluster
-    Assertions.assertEquals(averageUtilization,
-        containerBalancer.calculateAvgUtilization(nodesInCluster), 0.0001);
-  }
-
-  /**
-   * Checks whether ContainerBalancer is correctly updating the list of
-   * unBalanced nodes with varying values of Threshold.
-   */
-  @Test
-  public void
-      initializeIterationShouldUpdateUnBalancedNodesWhenThresholdChanges()
-      throws IllegalContainerBalancerStateException, IOException,
-      InvalidContainerBalancerConfigurationException, TimeoutException {
-    List<DatanodeUsageInfo> expectedUnBalancedNodes;
-    List<DatanodeUsageInfo> unBalancedNodesAccordingToBalancer;
-
-    // check for random threshold values
-    for (int i = 0; i < 50; i++) {
-      double randomThreshold = RANDOM.nextDouble() * 100;
-
-      balancerConfiguration.setThreshold(randomThreshold);
-      startBalancer(balancerConfiguration);
-
-      // waiting for balance completed.
-      // TODO: this is a temporary implementation for now
-      // modify this after balancer is fully completed
-      try {
-        Thread.sleep(100);
-      } catch (InterruptedException e) { }
-
-      expectedUnBalancedNodes =
-          determineExpectedUnBalancedNodes(randomThreshold);
-      unBalancedNodesAccordingToBalancer =
-          containerBalancer.getUnBalancedNodes();
-
-      stopBalancer();
-      Assertions.assertEquals(
-          expectedUnBalancedNodes.size(),
-          unBalancedNodesAccordingToBalancer.size());
-
-      for (int j = 0; j < expectedUnBalancedNodes.size(); j++) {
-        Assertions.assertEquals(
-            expectedUnBalancedNodes.get(j).getDatanodeDetails(),
-            unBalancedNodesAccordingToBalancer.get(j).getDatanodeDetails());
-      }
-    }
+    Assertions.assertEquals(doRun, false);

Review Comment:
   ```suggestion
       Assertions.assertFalse(doRun);
   ```



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerTask.java:
##########
@@ -0,0 +1,1032 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container.balancer;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.conf.StorageUnit;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.PlacementPolicyValidateProxy;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
+import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
+import org.apache.hadoop.hdds.scm.container.replication.LegacyReplicationManager;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.scm.net.NetworkTopology;
+import org.apache.hadoop.hdds.scm.node.DatanodeUsageInfo;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_NODE_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_NODE_REPORT_INTERVAL_DEFAULT;
+
+/**
+ * Container balancer is a service in SCM to move containers between over- and
+ * under-utilized datanodes.
+ */
+public class ContainerBalancerTask implements Runnable {
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(ContainerBalancerTask.class);
+
+  private NodeManager nodeManager;
+  private ContainerManager containerManager;
+  private ReplicationManager replicationManager;
+  private OzoneConfiguration ozoneConfiguration;
+  private ContainerBalancer containerBalancer;
+  private final SCMContext scmContext;
+  private double threshold;
+  private int totalNodesInCluster;
+  private double maxDatanodesRatioToInvolvePerIteration;
+  private long maxSizeToMovePerIteration;
+  private int countDatanodesInvolvedPerIteration;
+  private long sizeScheduledForMoveInLatestIteration;
+  // count actual size moved in bytes
+  private long sizeActuallyMovedInLatestIteration;
+  private int iterations;
+  private List<DatanodeUsageInfo> unBalancedNodes;
+  private List<DatanodeUsageInfo> overUtilizedNodes;
+  private List<DatanodeUsageInfo> underUtilizedNodes;
+  private List<DatanodeUsageInfo> withinThresholdUtilizedNodes;
+  private Set<String> excludeNodes;
+  private Set<String> includeNodes;
+  private ContainerBalancerConfiguration config;
+  private ContainerBalancerMetrics metrics;
+  private long clusterCapacity;
+  private long clusterRemaining;
+  private double clusterAvgUtilisation;
+  private PlacementPolicyValidateProxy placementPolicyValidateProxy;
+  private NetworkTopology networkTopology;
+  private double upperLimit;
+  private double lowerLimit;
+  private ContainerBalancerSelectionCriteria selectionCriteria;
+  private volatile Status taskStatus = Status.RUNNING;
+
+  /*
+  Since a container can be selected only once during an iteration, these maps
+   use it as a primary key to track source to target pairings.
+  */
+  private final Map<ContainerID, DatanodeDetails> containerToSourceMap;
+  private final Map<ContainerID, DatanodeDetails> containerToTargetMap;
+
+  private Set<DatanodeDetails> selectedTargets;
+  private Set<DatanodeDetails> selectedSources;
+  private FindTargetStrategy findTargetStrategy;
+  private FindSourceStrategy findSourceStrategy;
+  private Map<ContainerMoveSelection,
+      CompletableFuture<LegacyReplicationManager.MoveResult>>
+      moveSelectionToFutureMap;
+  private IterationResult iterationResult;
+  private int nextIterationIndex;
+
+  /**
+   * Constructs ContainerBalancer with the specified arguments. Initializes
+   * ContainerBalancerMetrics. Container Balancer does not start on
+   * construction.
+   *
+   * @param scm the storage container manager
+   */
+  public ContainerBalancerTask(StorageContainerManager scm,
+                               int nextIterationIndex,
+                               ContainerBalancer containerBalancer,
+                               ContainerBalancerMetrics metrics,
+                               ContainerBalancerConfiguration config) {
+    this.nodeManager = scm.getScmNodeManager();
+    this.containerManager = scm.getContainerManager();
+    this.replicationManager = scm.getReplicationManager();
+    this.ozoneConfiguration = scm.getConfiguration();
+    this.containerBalancer = containerBalancer;
+    this.config = config;
+    this.metrics = metrics;
+    this.scmContext = scm.getScmContext();
+    this.overUtilizedNodes = new ArrayList<>();
+    this.underUtilizedNodes = new ArrayList<>();
+    this.withinThresholdUtilizedNodes = new ArrayList<>();
+    this.unBalancedNodes = new ArrayList<>();
+    this.placementPolicyValidateProxy = scm.getPlacementPolicyValidateProxy();
+    this.networkTopology = scm.getClusterMap();
+    this.nextIterationIndex = nextIterationIndex;
+    this.containerToSourceMap = new HashMap<>();
+    this.containerToTargetMap = new HashMap<>();
+    this.selectedSources = new HashSet<>();
+    this.selectedTargets = new HashSet<>();
+    findSourceStrategy = new FindSourceGreedy(nodeManager);
+  }
+
+  /**
+   * Balances the cluster in iterations. Regularly checks if balancing has
+   * been stopped.
+   */
+  public void run() {
+    try {
+      balancer();
+    } catch (Throwable e) {
+      LOG.error("Container Balancer is stopped abnormally, ", e);
+    }
+    synchronized (this) {
+      taskStatus = Status.STOPPED;
+    }
+  }
+
+  /**
+   * Tries to stop ContainerBalancer changing status to stopping. Calls
+   * {@link ContainerBalancerTask#stop()}.
+   */
+  public void stop() {
+    synchronized (this) {
+      if (taskStatus == Status.RUNNING) {
+        taskStatus = Status.STOPPING;
+      }
+    }
+  }
+
+  private void balancer() {
+    this.iterations = config.getIterations();
+    if (this.iterations == -1) {
+      //run balancer infinitely
+      this.iterations = Integer.MAX_VALUE;
+    }
+
+    // nextIterationIndex is the iteration that balancer should start from on
+    // leader change or restart
+    int i = nextIterationIndex;
+    for (; i < iterations && isBalancerRunning(); i++) {
+      // reset some variables and metrics for this iteration
+      resetState();
+      if (config.getTriggerDuEnable()) {
+        // before starting a new iteration, we trigger all the datanode
+        // to run `du`. this is an aggressive action, with which we can
+        // get more precise usage info of all datanodes before moving.
+        // this is helpful for container balancer to make more appropriate
+        // decisions. this will increase the disk io load of data nodes, so
+        // please enable it with caution.
+        nodeManager.refreshAllHealthyDnUsageInfo();
+        try {
+          long nodeReportInterval =
+              ozoneConfiguration.getTimeDuration(HDDS_NODE_REPORT_INTERVAL,
+                  HDDS_NODE_REPORT_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS);
+          // one for sending command , one for running du, and one for
+          // reporting back make it like this for now, a more suitable
+          // value. can be set in the future if needed
+          wait(3 * nodeReportInterval);
+        } catch (InterruptedException e) {
+          LOG.info("Container Balancer was interrupted while waiting for" +
+              "datanodes refreshing volume usage info");
+          Thread.currentThread().interrupt();
+          return;
+        }
+      }
+
+      if (!isBalancerRunning()) {
+        return;
+      }
+
+      // initialize this iteration. stop balancing on initialization failure
+      if (!initializeIteration()) {
+        // just return if the reason for initialization failure is that
+        // balancer has been stopped in another thread
+        if (!isBalancerRunning()) {
+          return;
+        }
+        // otherwise, try to stop balancer
+        tryStopWithSaveConfiguration("Could not initialize " +
+            "ContainerBalancer's iteration number " + i);
+        return;
+      }
+
+      IterationResult iR = doIteration();
+      metrics.incrementNumIterations(1);
+      LOG.info("Result of this iteration of Container Balancer: {}", iR);
+
+      // if no new move option is generated, it means the cluster cannot be
+      // balanced anymore; so just stop balancer
+      if (iR == IterationResult.CAN_NOT_BALANCE_ANY_MORE) {
+        tryStopWithSaveConfiguration(iR.toString());
+        return;
+      }
+
+      // persist next iteration index
+      if (iR == IterationResult.ITERATION_COMPLETED) {
+        try {
+          saveConfiguration(config, true, i + 1);
+        } catch (IOException | TimeoutException e) {
+          LOG.warn("Could not persist next iteration index value for " +
+              "ContainerBalancer after completing an iteration", e);
+        }
+      }
+
+      // return if balancing has been stopped
+      if (!isBalancerRunning()) {
+        return;
+      }
+
+      // wait for configured time before starting next iteration, unless
+      // this was the final iteration
+      if (i != iterations - 1) {
+        try {
+          wait(config.getBalancingInterval().toMillis());
+        } catch (InterruptedException e) {
+          LOG.info("Container Balancer was interrupted while waiting for" +
+              " next iteration.");
+          Thread.currentThread().interrupt();
+          return;
+        }
+      }
+    }
+    
+    tryStopWithSaveConfiguration("Completed all iterations.");
+  }
+
+  /**
+   * Tries to stop ContainerBalancer. Logs the reason for stopping. Calls
+   * {@link ContainerBalancer#stopBalancer()}.
+   * @param stopReason a string specifying the reason for stopping
+   *                   ContainerBalancer.
+   */
+  private void tryStopWithSaveConfiguration(String stopReason) {
+    synchronized (this) {
+      try {
+        LOG.info("Save Configuration for stopping. Reason: {}", stopReason);
+        saveConfiguration(config, false, 0);
+        stop();
+      } catch (IOException | TimeoutException e) {
+        LOG.warn("Save configuration failed. Reason for " +
+            "stopping: {}", stopReason, e);
+      }
+    }
+  }
+
+  private void saveConfiguration(ContainerBalancerConfiguration configuration,
+                                 boolean shouldRun, int index)
+      throws IOException, TimeoutException {
+    if (!isValidSCMState()) {
+      LOG.warn("Save configuration is not allowed as not in valid State.");
+      return;
+    }
+    synchronized (this) {
+      if (isBalancerRunning()) {
+        containerBalancer.saveConfiguration(configuration, shouldRun, index);
+      }
+    }
+  }
+
+  /**
+   * Initializes an iteration during balancing. Recognizes over, under, and
+   * within threshold utilized nodes. Decides whether balancing needs to
+   * continue or should be stopped.
+   *
+   * @return true if successfully initialized, otherwise false.
+   */
+  private boolean initializeIteration() {
+    if (!isValidSCMState()) {
+      return false;
+    }
+    // sorted list in order from most to least used
+    List<DatanodeUsageInfo> datanodeUsageInfos =
+        nodeManager.getMostOrLeastUsedDatanodes(true);
+    if (datanodeUsageInfos.isEmpty()) {
+      LOG.warn("Received an empty list of datanodes from Node Manager when " +
+          "trying to identify which nodes to balance");
+      return false;
+    }
+
+    this.threshold = config.getThresholdAsRatio();
+    this.maxDatanodesRatioToInvolvePerIteration =
+        config.getMaxDatanodesRatioToInvolvePerIteration();
+    this.maxSizeToMovePerIteration = config.getMaxSizeToMovePerIteration();
+    if (config.getNetworkTopologyEnable()) {
+      findTargetStrategy = new FindTargetGreedyByNetworkTopology(
+          containerManager, placementPolicyValidateProxy,
+          nodeManager, networkTopology);
+    } else {
+      findTargetStrategy = new FindTargetGreedyByUsageInfo(containerManager,
+          placementPolicyValidateProxy, nodeManager);
+    }
+    this.excludeNodes = config.getExcludeNodes();
+    this.includeNodes = config.getIncludeNodes();
+    // include/exclude nodes from balancing according to configs
+    datanodeUsageInfos.removeIf(datanodeUsageInfo -> shouldExcludeDatanode(
+        datanodeUsageInfo.getDatanodeDetails()));
+
+    this.totalNodesInCluster = datanodeUsageInfos.size();
+
+    clusterAvgUtilisation = calculateAvgUtilization(datanodeUsageInfos);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Average utilization of the cluster is {}",
+          clusterAvgUtilisation);
+    }
+
+    // over utilized nodes have utilization(that is, used / capacity) greater
+    // than upper limit
+    this.upperLimit = clusterAvgUtilisation + threshold;
+    // under utilized nodes have utilization(that is, used / capacity) less
+    // than lower limit
+    this.lowerLimit = clusterAvgUtilisation - threshold;
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Lower limit for utilization is {} and Upper limit for " +
+          "utilization is {}", lowerLimit, upperLimit);
+    }
+
+    long totalOverUtilizedBytes = 0L, totalUnderUtilizedBytes = 0L;
+    // find over and under utilized nodes
+    for (DatanodeUsageInfo datanodeUsageInfo : datanodeUsageInfos) {
+      if (!isBalancerRunning()) {
+        return false;
+      }
+      double utilization = datanodeUsageInfo.calculateUtilization();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Utilization for node {} with capacity {}B, used {}B, and " +
+                "remaining {}B is {}",
+            datanodeUsageInfo.getDatanodeDetails().getUuidString(),
+            datanodeUsageInfo.getScmNodeStat().getCapacity().get(),
+            datanodeUsageInfo.getScmNodeStat().getScmUsed().get(),
+            datanodeUsageInfo.getScmNodeStat().getRemaining().get(),
+            utilization);
+      }
+      if (Double.compare(utilization, upperLimit) > 0) {
+        overUtilizedNodes.add(datanodeUsageInfo);
+        metrics.incrementNumDatanodesUnbalanced(1);
+
+        // amount of bytes greater than upper limit in this node
+        long overUtilizedBytes = ratioToBytes(
+            datanodeUsageInfo.getScmNodeStat().getCapacity().get(),
+            utilization) - ratioToBytes(
+            datanodeUsageInfo.getScmNodeStat().getCapacity().get(),
+            upperLimit);
+        totalOverUtilizedBytes += overUtilizedBytes;
+      } else if (Double.compare(utilization, lowerLimit) < 0) {
+        underUtilizedNodes.add(datanodeUsageInfo);
+        metrics.incrementNumDatanodesUnbalanced(1);
+
+        // amount of bytes lesser than lower limit in this node
+        long underUtilizedBytes = ratioToBytes(
+            datanodeUsageInfo.getScmNodeStat().getCapacity().get(),
+            lowerLimit) - ratioToBytes(
+            datanodeUsageInfo.getScmNodeStat().getCapacity().get(),
+            utilization);
+        totalUnderUtilizedBytes += underUtilizedBytes;
+      } else {
+        withinThresholdUtilizedNodes.add(datanodeUsageInfo);
+      }
+    }
+    metrics.incrementDataSizeUnbalancedGB(
+        Math.max(totalOverUtilizedBytes, totalUnderUtilizedBytes) /
+            OzoneConsts.GB);
+    Collections.reverse(underUtilizedNodes);
+
+    unBalancedNodes = new ArrayList<>(
+        overUtilizedNodes.size() + underUtilizedNodes.size());
+    unBalancedNodes.addAll(overUtilizedNodes);
+    unBalancedNodes.addAll(underUtilizedNodes);
+
+    if (unBalancedNodes.isEmpty()) {
+      LOG.info("Did not find any unbalanced Datanodes.");
+      return false;
+    }
+
+    LOG.info("Container Balancer has identified {} Over-Utilized and {} " +
+            "Under-Utilized Datanodes that need to be balanced.",
+        overUtilizedNodes.size(), underUtilizedNodes.size());
+
+    if (LOG.isDebugEnabled()) {
+      overUtilizedNodes.forEach(entry -> {
+        LOG.debug("Datanode {} {} is Over-Utilized.",
+            entry.getDatanodeDetails().getHostName(),
+            entry.getDatanodeDetails().getUuid());
+      });
+
+      underUtilizedNodes.forEach(entry -> {
+        LOG.debug("Datanode {} {} is Under-Utilized.",
+            entry.getDatanodeDetails().getHostName(),
+            entry.getDatanodeDetails().getUuid());
+      });
+    }
+
+    selectionCriteria = new ContainerBalancerSelectionCriteria(config,
+        nodeManager, replicationManager, containerManager, findSourceStrategy);
+    return true;
+  }
+
+  private boolean isValidSCMState() {
+    if (scmContext.isInSafeMode()) {
+      LOG.error("Container Balancer cannot operate while SCM is in Safe Mode.");
+      return false;
+    }
+    if (!scmContext.isLeaderReady()) {
+      LOG.warn("Current SCM is not the leader.");
+      return false;
+    }
+    return true;
+  }
+
+  private IterationResult doIteration() {
+    // note that potential and selected targets are updated in the following
+    // loop
+    //TODO(jacksonyao): take withinThresholdUtilizedNodes as candidate for both
+    // source and target
+    findSourceStrategy.reInitialize(getPotentialSources(), config, lowerLimit);
+    List<DatanodeUsageInfo> potentialTargets = getPotentialTargets();
+    findTargetStrategy.reInitialize(potentialTargets, config, upperLimit);
+
+    moveSelectionToFutureMap = new HashMap<>(unBalancedNodes.size());
+    boolean isMoveGeneratedInThisIteration = false;
+    iterationResult = IterationResult.ITERATION_COMPLETED;
+
+    // match each source node with a target
+    while (true) {
+      if (!isBalancerRunning()) {
+        iterationResult = IterationResult.ITERATION_INTERRUPTED;
+        break;
+      }
+
+      if (checkIterationLimits()) {
+        /* scheduled enough moves to hit either maxSizeToMovePerIteration or
+        maxDatanodesPercentageToInvolvePerIteration limit
+        */
+        break;
+      }
+
+      DatanodeDetails source =
+          findSourceStrategy.getNextCandidateSourceDataNode();
+      if (source == null) {
+        // no more source DNs are present
+        break;
+      }
+
+      ContainerMoveSelection moveSelection = matchSourceWithTarget(source);
+      if (moveSelection != null) {
+        if (processMoveSelection(source, moveSelection)) {
+          isMoveGeneratedInThisIteration = true;
+        }
+      } else {
+        // can not find any target for this source
+        findSourceStrategy.removeCandidateSourceDataNode(source);
+      }
+    }
+
+    checkIterationResults(isMoveGeneratedInThisIteration);
+    return iterationResult;
+  }
+
+  private boolean processMoveSelection(DatanodeDetails source,
+                                       ContainerMoveSelection moveSelection) {
+    ContainerID containerID = moveSelection.getContainerID();
+    if (containerToSourceMap.containsKey(containerID) ||
+        containerToTargetMap.containsKey(containerID)) {
+      LOG.warn("Container {} has already been selected for move from source " +
+              "{} to target {} earlier. Not moving this container again.",
+          containerID,
+          containerToSourceMap.get(containerID),
+          containerToTargetMap.get(containerID));
+      return false;
+    }
+
+    ContainerInfo containerInfo;
+    try {
+      containerInfo =
+          containerManager.getContainer(containerID);
+    } catch (ContainerNotFoundException e) {
+      LOG.warn("Could not get container {} from Container Manager before " +
+          "starting a container move", containerID, e);
+      return false;
+    }
+    LOG.info("ContainerBalancer is trying to move container {} with size " +
+            "{}B from source datanode {} to target datanode {}",
+        containerID.toString(),
+        containerInfo.getUsedBytes(),
+        source.getUuidString(),
+        moveSelection.getTargetNode().getUuidString());
+
+    if (moveContainer(source, moveSelection)) {
+      // consider move successful for now, and update selection criteria
+      updateTargetsAndSelectionCriteria(moveSelection, source);
+    }
+    return true;
+  }
+
+  /**
+   * Check the iteration results. Result can be:
+   * <p>ITERATION_INTERRUPTED if balancing was stopped</p>
+   * <p>CAN_NOT_BALANCE_ANY_MORE if no move was generated during this iteration
+   * </p>
+   * <p>ITERATION_COMPLETED</p>
+   * @param isMoveGeneratedInThisIteration whether a move was generated during
+   *                                       the iteration
+   */
+  private void checkIterationResults(boolean isMoveGeneratedInThisIteration) {
+    if (!isMoveGeneratedInThisIteration) {
+      /*
+       If no move was generated during this iteration then we don't need to
+       check the move results
+       */
+      iterationResult = IterationResult.CAN_NOT_BALANCE_ANY_MORE;
+    } else {
+      checkIterationMoveResults();
+    }
+  }
+
+  /**
+   * Checks the results of all move operations when exiting an iteration.
+   */
+  private void checkIterationMoveResults() {
+    this.countDatanodesInvolvedPerIteration = 0;
+    CompletableFuture<Void> allFuturesResult = CompletableFuture.allOf(
+        moveSelectionToFutureMap.values()
+            .toArray(new CompletableFuture[moveSelectionToFutureMap.size()]));
+    try {
+      allFuturesResult.get(config.getMoveTimeout().toMillis(),
+          TimeUnit.MILLISECONDS);
+    } catch (InterruptedException e) {
+      LOG.warn("Container balancer is interrupted");
+      Thread.currentThread().interrupt();
+    } catch (TimeoutException e) {
+      long timeoutCounts = cancelAndCountPendingMoves();
+      LOG.warn("{} Container moves are canceled.", timeoutCounts);
+      metrics.incrementNumContainerMovesTimeoutInLatestIteration(timeoutCounts);
+    } catch (ExecutionException e) {
+      LOG.error("Got exception while checkIterationMoveResults", e);
+    }
+
+    countDatanodesInvolvedPerIteration =
+        selectedSources.size() + selectedTargets.size();
+    metrics.incrementNumDatanodesInvolvedInLatestIteration(
+        countDatanodesInvolvedPerIteration);
+    metrics.incrementNumContainerMovesCompleted(
+        metrics.getNumContainerMovesCompletedInLatestIteration());
+    metrics.incrementNumContainerMovesTimeout(
+        metrics.getNumContainerMovesTimeoutInLatestIteration());
+    metrics.incrementDataSizeMovedGBInLatestIteration(
+        sizeActuallyMovedInLatestIteration / OzoneConsts.GB);
+    metrics.incrementDataSizeMovedGB(
+        metrics.getDataSizeMovedGBInLatestIteration());
+    metrics.incrementNumContainerMovesFailed(
+        metrics.getNumContainerMovesFailedInLatestIteration());
+    LOG.info("Iteration Summary. Number of Datanodes involved: {}. Size " +
+            "moved: {} ({} Bytes). Number of Container moves completed: {}.",
+        countDatanodesInvolvedPerIteration,
+        StringUtils.byteDesc(sizeActuallyMovedInLatestIteration),
+        sizeActuallyMovedInLatestIteration,
+        metrics.getNumContainerMovesCompletedInLatestIteration());
+  }
+
+  private long cancelAndCountPendingMoves() {
+    return moveSelectionToFutureMap.entrySet().stream()
+        .filter(entry -> !entry.getValue().isDone())
+        .peek(entry -> {
+          LOG.warn("Container move timeout for container {} from source {}" +
+                  " to target {}.",
+              entry.getKey().getContainerID(),
+              containerToSourceMap.get(entry.getKey().getContainerID())
+                  .getUuidString(),
+              entry.getKey().getTargetNode().getUuidString());
+          entry.getValue().cancel(true);
+        }).count();
+  }
+
+  /**
+   * Match a source datanode with a target datanode and identify the container
+   * to move.
+   *
+   * @return ContainerMoveSelection containing the selected target and container
+   */
+  private ContainerMoveSelection matchSourceWithTarget(DatanodeDetails source) {
+    NavigableSet<ContainerID> candidateContainers =
+        selectionCriteria.getCandidateContainers(source);
+
+    if (candidateContainers.isEmpty()) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("ContainerBalancer could not find any candidate containers " +
+            "for datanode {}", source.getUuidString());
+      }
+      return null;
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("ContainerBalancer is finding suitable target for source " +
+          "datanode {}", source.getUuidString());
+    }
+    ContainerMoveSelection moveSelection =
+        findTargetStrategy.findTargetForContainerMove(
+            source, candidateContainers);
+
+    if (moveSelection == null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("ContainerBalancer could not find a suitable target for " +
+            "source node {}.", source.getUuidString());
+      }
+      return null;
+    }
+    LOG.info("ContainerBalancer matched source datanode {} with target " +
+            "datanode {} for container move.", source.getUuidString(),
+        moveSelection.getTargetNode().getUuidString());
+
+    return moveSelection;
+  }
+
+  /**
+   * Checks if limits maxDatanodesPercentageToInvolvePerIteration and
+   * maxSizeToMovePerIteration have been hit.
+   *
+   * @return true if a limit was hit, else false
+   */
+  private boolean checkIterationLimits() {
+    int maxDatanodesToInvolve =
+        (int) (maxDatanodesRatioToInvolvePerIteration * totalNodesInCluster);
+    if (countDatanodesInvolvedPerIteration + 2 > maxDatanodesToInvolve) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Hit max datanodes to involve limit. {} datanodes have" +
+                " already been scheduled for balancing and the limit is {}.",
+            countDatanodesInvolvedPerIteration, maxDatanodesToInvolve);
+      }
+      return true;
+    }
+    if (sizeScheduledForMoveInLatestIteration +
+        (long) ozoneConfiguration.getStorageSize(
+        ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
+        ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT,
+        StorageUnit.BYTES) > maxSizeToMovePerIteration) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Hit max size to move limit. {} bytes have already been " +
+                "scheduled for balancing and the limit is {} bytes.",
+            sizeScheduledForMoveInLatestIteration,
+            maxSizeToMovePerIteration);
+      }
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * Asks {@link ReplicationManager} to move the specified container from
+   * source to target.
+   *
+   * @param source the source datanode
+   * @param moveSelection the selected container to move and target datanode
+   * @return false if an exception occurred or the move completed with a
+   * result other than ReplicationManager.MoveResult.COMPLETED. Returns true
+   * if the move completed with MoveResult.COMPLETED or move is not yet done
+   */
+  private boolean moveContainer(DatanodeDetails source,
+                                ContainerMoveSelection moveSelection) {
+    ContainerID containerID = moveSelection.getContainerID();
+    CompletableFuture<LegacyReplicationManager.MoveResult> future;
+    try {
+      ContainerInfo containerInfo = containerManager.getContainer(containerID);
+      future = replicationManager
+          .move(containerID, source, moveSelection.getTargetNode())
+          .whenComplete((result, ex) -> {
+
+            metrics.incrementCurrentIterationContainerMoveMetric(result, 1);
+            if (ex != null) {
+              LOG.info("Container move for container {} from source {} to " +
+                      "target {} failed with exceptions {}",
+                  containerID.toString(),
+                  source.getUuidString(),
+                  moveSelection.getTargetNode().getUuidString(), ex);
+              metrics.incrementNumContainerMovesFailedInLatestIteration(1);
+            } else {
+              if (result == LegacyReplicationManager.MoveResult.COMPLETED) {
+                sizeActuallyMovedInLatestIteration +=
+                    containerInfo.getUsedBytes();
+                if (LOG.isDebugEnabled()) {
+                  LOG.debug("Container move completed for container {} from " +
+                          "source {} to target {}", containerID,
+                      source.getUuidString(),
+                      moveSelection.getTargetNode().getUuidString());
+                }
+              } else {
+                LOG.warn(
+                    "Container move for container {} from source {} to target" +
+                        " {} failed: {}",
+                    moveSelection.getContainerID(), source.getUuidString(),
+                    moveSelection.getTargetNode().getUuidString(), result);
+              }
+            }
+          });
+    } catch (ContainerNotFoundException e) {
+      LOG.warn("Could not find Container {} for container move",
+          containerID, e);
+      metrics.incrementNumContainerMovesFailedInLatestIteration(1);
+      return false;
+    } catch (NodeNotFoundException | TimeoutException e) {
+      LOG.warn("Container move failed for container {}", containerID, e);
+      metrics.incrementNumContainerMovesFailedInLatestIteration(1);
+      return false;
+    }
+
+    if (future.isDone()) {
+      if (future.isCompletedExceptionally()) {
+        return false;
+      } else {
+        LegacyReplicationManager.MoveResult result = future.join();
+        moveSelectionToFutureMap.put(moveSelection, future);
+        return result == LegacyReplicationManager.MoveResult.COMPLETED;
+      }
+    } else {
+      moveSelectionToFutureMap.put(moveSelection, future);
+      return true;
+    }
+  }
+
+  /**
+   * Update targets, sources, and selection criteria after a move.
+   *
+   * @param moveSelection latest selected target datanode and container
+   * @param source        the source datanode
+   */
+  private void updateTargetsAndSelectionCriteria(
+      ContainerMoveSelection moveSelection, DatanodeDetails source) {
+    ContainerID containerID = moveSelection.getContainerID();
+    DatanodeDetails target = moveSelection.getTargetNode();
+
+    // count source if it has not been involved in move earlier
+    if (!selectedSources.contains(source)) {
+      countDatanodesInvolvedPerIteration += 1;
+    }
+    // count target if it has not been involved in move earlier
+    if (!selectedTargets.contains(target)) {
+      countDatanodesInvolvedPerIteration += 1;
+    }
+
+    incSizeSelectedForMoving(source, moveSelection);
+    containerToSourceMap.put(containerID, source);
+    containerToTargetMap.put(containerID, target);
+    selectedTargets.add(target);
+    selectedSources.add(source);
+    selectionCriteria.setSelectedContainers(
+        new HashSet<>(containerToSourceMap.keySet()));
+  }
+
+  /**
+   * Calculates the number of used bytes given capacity and utilization ratio.
+   *
+   * @param nodeCapacity     capacity of the node.
+   * @param utilizationRatio used space by capacity ratio of the node.
+   * @return number of bytes
+   */
+  private long ratioToBytes(Long nodeCapacity, double utilizationRatio) {
+    return (long) (nodeCapacity * utilizationRatio);
+  }
+
+  /**
+   * Calculates the average utilization for the specified nodes.
+   * Utilization is (capacity - remaining) divided by capacity.
+   *
+   * @param nodes List of DatanodeUsageInfo to find the average utilization for
+   * @return Average utilization value
+   */
+  @VisibleForTesting
+  double calculateAvgUtilization(List<DatanodeUsageInfo> nodes) {
+    if (nodes.size() == 0) {
+      LOG.warn("No nodes to calculate average utilization for in " +
+          "ContainerBalancer.");
+      return 0;
+    }
+    SCMNodeStat aggregatedStats = new SCMNodeStat(
+        0, 0, 0);
+    for (DatanodeUsageInfo node : nodes) {
+      aggregatedStats.add(node.getScmNodeStat());
+    }
+    clusterCapacity = aggregatedStats.getCapacity().get();
+    clusterRemaining = aggregatedStats.getRemaining().get();
+
+    return (clusterCapacity - clusterRemaining) / (double) clusterCapacity;
+  }
+
+  /**
+   * Get potential targets for container move. Potential targets are under
+   * utilized and within threshold utilized nodes.
+   *
+   * @return A list of potential target DatanodeUsageInfo.
+   */
+  private List<DatanodeUsageInfo> getPotentialTargets() {
+    //TODO(jacksonyao): take withinThresholdUtilizedNodes as candidate for both
+    // source and target
+    return underUtilizedNodes;
+  }
+
+  /**
+   * Get potential sourecs for container move. Potential sourecs are over
+   * utilized and within threshold utilized nodes.
+   *
+   * @return A list of potential source DatanodeUsageInfo.
+   */
+  private List<DatanodeUsageInfo> getPotentialSources() {
+    //TODO(jacksonyao): take withinThresholdUtilizedNodes as candidate for both
+    // source and target
+    return overUtilizedNodes;
+  }
+
+  /**
+   * Consults the configurations {@link ContainerBalancerTask#includeNodes} and
+   * {@link ContainerBalancerTask#excludeNodes} to check if the specified
+   * Datanode should be excluded from balancing.
+   * @param datanode DatanodeDetails to check
+   * @return true if Datanode should be excluded, else false
+   */
+  private boolean shouldExcludeDatanode(DatanodeDetails datanode) {
+    if (excludeNodes.contains(datanode.getHostName()) ||
+        excludeNodes.contains(datanode.getIpAddress())) {
+      return true;
+    } else if (!includeNodes.isEmpty()) {
+      return !includeNodes.contains(datanode.getHostName()) &&
+          !includeNodes.contains(datanode.getIpAddress());
+    }
+    return false;
+  }
+
+  /**
+   * Updates conditions for balancing, such as total size moved by balancer,
+   * total size that has entered a datanode, and total size that has left a
+   * datanode in this iteration.
+   *
+   * @param source        source datanode
+   * @param moveSelection selected target datanode and container
+   */
+  private void incSizeSelectedForMoving(DatanodeDetails source,
+                                        ContainerMoveSelection moveSelection) {
+    DatanodeDetails target = moveSelection.getTargetNode();
+    ContainerInfo container;
+    try {
+      container =
+          containerManager.getContainer(moveSelection.getContainerID());
+    } catch (ContainerNotFoundException e) {
+      LOG.warn("Could not find Container {} while matching source and " +
+              "target nodes in ContainerBalancer",
+          moveSelection.getContainerID(), e);
+      return;
+    }
+    long size = container.getUsedBytes();
+    sizeScheduledForMoveInLatestIteration += size;
+
+    // update sizeLeavingNode map with the recent moveSelection
+    findSourceStrategy.increaseSizeLeaving(source, size);
+
+    // update sizeEnteringNode map with the recent moveSelection
+    findTargetStrategy.increaseSizeEntering(target, size);
+  }
+
+  /**
+   * Resets some variables and metrics for this iteration.
+   */
+  private void resetState() {
+    this.clusterCapacity = 0L;
+    this.clusterRemaining = 0L;
+    this.overUtilizedNodes.clear();
+    this.underUtilizedNodes.clear();
+    this.unBalancedNodes.clear();
+    this.containerToSourceMap.clear();
+    this.containerToTargetMap.clear();
+    this.selectedSources.clear();
+    this.selectedTargets.clear();
+    this.countDatanodesInvolvedPerIteration = 0;
+    this.sizeScheduledForMoveInLatestIteration = 0;
+    this.sizeActuallyMovedInLatestIteration = 0;
+    metrics.resetDataSizeMovedGBInLatestIteration();
+    metrics.resetNumContainerMovesCompletedInLatestIteration();
+    metrics.resetNumContainerMovesTimeoutInLatestIteration();
+    metrics.resetNumDatanodesInvolvedInLatestIteration();
+    metrics.resetDataSizeUnbalancedGB();
+    metrics.resetNumDatanodesUnbalanced();
+    metrics.resetNumContainerMovesFailedInLatestIteration();
+  }
+
+  /**
+   * Checks if ContainerBalancer is currently running in this SCM.
+   *
+   * @return true if the currentBalancingThread is not null, otherwise false

Review Comment:
   This is no longer correct



##########
hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancerTask.java:
##########
@@ -0,0 +1,1000 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container.balancer;
+
+import com.google.protobuf.ByteString;
+import org.apache.hadoop.hdds.client.RatisReplicationConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
+import org.apache.hadoop.hdds.scm.ContainerPlacementStatus;
+import org.apache.hadoop.hdds.scm.PlacementPolicy;
+import org.apache.hadoop.hdds.scm.PlacementPolicyValidateProxy;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.container.MockNodeManager;
+import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicyFactory;
+import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementMetrics;
+import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
+import org.apache.hadoop.hdds.scm.container.replication.LegacyReplicationManager.MoveResult;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.scm.ha.SCMService;
+import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
+import org.apache.hadoop.hdds.scm.ha.StatefulServiceStateManager;
+import org.apache.hadoop.hdds.scm.ha.StatefulServiceStateManagerImpl;
+import org.apache.hadoop.hdds.scm.net.NetworkTopology;
+import org.apache.hadoop.hdds.scm.node.DatanodeUsageInfo;
+import org.apache.hadoop.hdds.scm.node.NodeStatus;
+import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.ozone.test.GenericTestUtils;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.event.Level;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for {@link ContainerBalancer}.
+ */
+public class TestContainerBalancerTask {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestContainerBalancerTask.class);
+
+  private ReplicationManager replicationManager;
+  private ContainerManager containerManager;
+  private ContainerBalancerTask containerBalancerTask;
+  private MockNodeManager mockNodeManager;
+  private StorageContainerManager scm;
+  private OzoneConfiguration conf;
+  private PlacementPolicy placementPolicy;
+  private PlacementPolicy ecPlacementPolicy;
+  private PlacementPolicyValidateProxy placementPolicyValidateProxy;
+  private ContainerBalancerConfiguration balancerConfiguration;
+  private List<DatanodeUsageInfo> nodesInCluster;
+  private List<Double> nodeUtilizations;
+  private double averageUtilization;
+  private int numberOfNodes;
+  private Map<ContainerID, Set<ContainerReplica>> cidToReplicasMap =
+      new HashMap<>();
+  private Map<ContainerID, ContainerInfo> cidToInfoMap = new HashMap<>();
+  private Map<DatanodeUsageInfo, Set<ContainerID>> datanodeToContainersMap =
+      new HashMap<>();
+  private Map<String, ByteString> serviceToConfigMap = new HashMap<>();
+  private static final ThreadLocalRandom RANDOM = ThreadLocalRandom.current();
+
+  private StatefulServiceStateManager serviceStateManager;
+  private static final long STORAGE_UNIT = OzoneConsts.GB;
+
+  /**
+   * Sets up configuration values and creates a mock cluster.
+   */
+  @BeforeEach
+  public void setup() throws IOException, NodeNotFoundException,
+      TimeoutException {
+    conf = new OzoneConfiguration();
+    scm = Mockito.mock(StorageContainerManager.class);
+    containerManager = Mockito.mock(ContainerManager.class);
+    replicationManager = Mockito.mock(ReplicationManager.class);
+    serviceStateManager = Mockito.mock(StatefulServiceStateManagerImpl.class);
+    SCMServiceManager scmServiceManager = Mockito.mock(SCMServiceManager.class);
+
+    // these configs will usually be specified in each test
+    balancerConfiguration =
+        conf.getObject(ContainerBalancerConfiguration.class);
+    balancerConfiguration.setThreshold(10);
+    balancerConfiguration.setIterations(1);
+    balancerConfiguration.setMaxDatanodesPercentageToInvolvePerIteration(100);
+    balancerConfiguration.setMaxSizeToMovePerIteration(50 * STORAGE_UNIT);
+    balancerConfiguration.setMaxSizeEnteringTarget(50 * STORAGE_UNIT);
+    conf.setFromObject(balancerConfiguration);
+    GenericTestUtils.setLogLevel(ContainerBalancer.LOG, Level.DEBUG);
+
+    averageUtilization = createCluster();
+    mockNodeManager = new MockNodeManager(datanodeToContainersMap);
+
+    NetworkTopology clusterMap = mockNodeManager.getClusterNetworkTopologyMap();
+
+    placementPolicy = ContainerPlacementPolicyFactory
+        .getPolicy(conf, mockNodeManager, clusterMap, true,
+            SCMContainerPlacementMetrics.create());
+    ecPlacementPolicy = ContainerPlacementPolicyFactory.getECPolicy(
+        conf, mockNodeManager, clusterMap,
+        true, SCMContainerPlacementMetrics.create());
+    placementPolicyValidateProxy = new PlacementPolicyValidateProxy(
+        placementPolicy, ecPlacementPolicy);
+
+    Mockito.when(replicationManager
+        .isContainerReplicatingOrDeleting(Mockito.any(ContainerID.class)))
+        .thenReturn(false);
+
+    Mockito.when(replicationManager.move(Mockito.any(ContainerID.class),
+        Mockito.any(DatanodeDetails.class),
+        Mockito.any(DatanodeDetails.class)))
+        .thenReturn(CompletableFuture.completedFuture(MoveResult.COMPLETED));
+
+    when(containerManager.getContainerReplicas(Mockito.any(ContainerID.class)))
+        .thenAnswer(invocationOnMock -> {
+          ContainerID cid = (ContainerID) invocationOnMock.getArguments()[0];
+          return cidToReplicasMap.get(cid);
+        });
+
+    when(containerManager.getContainer(Mockito.any(ContainerID.class)))
+        .thenAnswer(invocationOnMock -> {
+          ContainerID cid = (ContainerID) invocationOnMock.getArguments()[0];
+          return cidToInfoMap.get(cid);
+        });
+
+    when(containerManager.getContainers())
+        .thenReturn(new ArrayList<>(cidToInfoMap.values()));
+
+    when(scm.getScmNodeManager()).thenReturn(mockNodeManager);
+    when(scm.getContainerPlacementPolicy()).thenReturn(placementPolicy);
+    when(scm.getContainerManager()).thenReturn(containerManager);
+    when(scm.getReplicationManager()).thenReturn(replicationManager);
+    when(scm.getScmContext()).thenReturn(SCMContext.emptyContext());
+    when(scm.getClusterMap()).thenReturn(null);
+    when(scm.getEventQueue()).thenReturn(mock(EventPublisher.class));
+    when(scm.getConfiguration()).thenReturn(conf);
+    when(scm.getStatefulServiceStateManager()).thenReturn(serviceStateManager);
+    when(scm.getSCMServiceManager()).thenReturn(scmServiceManager);
+    when(scm.getPlacementPolicyValidateProxy())
+        .thenReturn(placementPolicyValidateProxy);
+
+    /*
+    When StatefulServiceStateManager#saveConfiguration is called, save to
+    in-memory serviceToConfigMap instead.
+     */
+    Mockito.doAnswer(i -> {
+      serviceToConfigMap.put(i.getArgument(0, String.class), i.getArgument(1,
+          ByteString.class));
+      return null;
+    }).when(serviceStateManager).saveConfiguration(
+        Mockito.any(String.class),
+        Mockito.any(ByteString.class));
+
+    /*
+    When StatefulServiceStateManager#readConfiguration is called, read from
+    serviceToConfigMap instead.
+     */
+    when(serviceStateManager.readConfiguration(Mockito.anyString())).thenAnswer(
+        i -> serviceToConfigMap.get(i.getArgument(0, String.class)));
+
+    Mockito.doNothing().when(scmServiceManager)
+        .register(Mockito.any(SCMService.class));
+    ContainerBalancer sb = new ContainerBalancer(scm);
+    containerBalancerTask = new ContainerBalancerTask(scm, 0, sb,
+        sb.getMetrics(), null);
+  }
+
+  @Test
+  public void testCalculationOfUtilization() {
+    Assertions.assertEquals(nodesInCluster.size(), nodeUtilizations.size());
+    for (int i = 0; i < nodesInCluster.size(); i++) {
+      Assertions.assertEquals(nodeUtilizations.get(i),
+          nodesInCluster.get(i).calculateUtilization(), 0.0001);
+    }
+
+    // should be equal to average utilization of the cluster
+    Assertions.assertEquals(averageUtilization,
+        containerBalancerTask.calculateAvgUtilization(nodesInCluster), 0.0001);
+  }
+
+  /**
+   * Checks whether ContainerBalancer is correctly updating the list of
+   * unBalanced nodes with varying values of Threshold.
+   */
+  @Test
+  public void
+      initializeIterationShouldUpdateUnBalancedNodesWhenThresholdChanges()
+      throws IllegalContainerBalancerStateException, IOException,
+      InvalidContainerBalancerConfigurationException, TimeoutException {

Review Comment:
   These exceptions are not being thrown in the method body



##########
hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancer.java:
##########
@@ -212,559 +206,106 @@ public void setup() throws IOException, NodeNotFoundException,
   }
 
   @Test
-  public void testCalculationOfUtilization() {
-    Assertions.assertEquals(nodesInCluster.size(), nodeUtilizations.size());
-    for (int i = 0; i < nodesInCluster.size(); i++) {
-      Assertions.assertEquals(nodeUtilizations.get(i),
-          nodesInCluster.get(i).calculateUtilization(), 0.0001);
-    }
-
+  public void testShouldRun() throws Exception {
+    boolean doRun = containerBalancer.shouldRun();
     // should be equal to average utilization of the cluster
-    Assertions.assertEquals(averageUtilization,
-        containerBalancer.calculateAvgUtilization(nodesInCluster), 0.0001);
-  }
-
-  /**
-   * Checks whether ContainerBalancer is correctly updating the list of
-   * unBalanced nodes with varying values of Threshold.
-   */
-  @Test
-  public void
-      initializeIterationShouldUpdateUnBalancedNodesWhenThresholdChanges()
-      throws IllegalContainerBalancerStateException, IOException,
-      InvalidContainerBalancerConfigurationException, TimeoutException {
-    List<DatanodeUsageInfo> expectedUnBalancedNodes;
-    List<DatanodeUsageInfo> unBalancedNodesAccordingToBalancer;
-
-    // check for random threshold values
-    for (int i = 0; i < 50; i++) {
-      double randomThreshold = RANDOM.nextDouble() * 100;
-
-      balancerConfiguration.setThreshold(randomThreshold);
-      startBalancer(balancerConfiguration);
-
-      // waiting for balance completed.
-      // TODO: this is a temporary implementation for now
-      // modify this after balancer is fully completed
-      try {
-        Thread.sleep(100);
-      } catch (InterruptedException e) { }
-
-      expectedUnBalancedNodes =
-          determineExpectedUnBalancedNodes(randomThreshold);
-      unBalancedNodesAccordingToBalancer =
-          containerBalancer.getUnBalancedNodes();
-
-      stopBalancer();
-      Assertions.assertEquals(
-          expectedUnBalancedNodes.size(),
-          unBalancedNodesAccordingToBalancer.size());
-
-      for (int j = 0; j < expectedUnBalancedNodes.size(); j++) {
-        Assertions.assertEquals(
-            expectedUnBalancedNodes.get(j).getDatanodeDetails(),
-            unBalancedNodesAccordingToBalancer.get(j).getDatanodeDetails());
-      }
-    }
+    Assertions.assertEquals(doRun, false);
+    containerBalancer.saveConfiguration(balancerConfiguration, true, 0);
+    doRun = containerBalancer.shouldRun();
+    Assertions.assertEquals(doRun, true);
+    containerBalancer.saveConfiguration(balancerConfiguration, false, 0);
+    doRun = containerBalancer.shouldRun();
+    Assertions.assertEquals(doRun, false);
   }
 
-  /**
-   * Checks whether the list of unBalanced nodes is empty when the cluster is
-   * balanced.
-   */
   @Test
-  public void unBalancedNodesListShouldBeEmptyWhenClusterIsBalanced()
-      throws IllegalContainerBalancerStateException, IOException,
-      InvalidContainerBalancerConfigurationException, TimeoutException {
-    balancerConfiguration.setThreshold(99.99);
-    startBalancer(balancerConfiguration);
-
-    sleepWhileBalancing(100);
-
-    stopBalancer();
-    ContainerBalancerMetrics metrics = containerBalancer.getMetrics();
-    Assertions.assertEquals(0, containerBalancer.getUnBalancedNodes().size());
-    Assertions.assertEquals(0, metrics.getNumDatanodesUnbalanced());
-  }
-
-  /**
-   * ContainerBalancer should not involve more datanodes than the
-   * maxDatanodesRatioToInvolvePerIteration limit.
-   */
-  @Test
-  public void containerBalancerShouldObeyMaxDatanodesToInvolveLimit()
-      throws IllegalContainerBalancerStateException, IOException,
-      InvalidContainerBalancerConfigurationException, TimeoutException {
-    int percent = 20;
-    balancerConfiguration.setMaxDatanodesPercentageToInvolvePerIteration(
-        percent);
-    balancerConfiguration.setMaxSizeToMovePerIteration(100 * STORAGE_UNIT);
-    balancerConfiguration.setThreshold(1);
-    balancerConfiguration.setIterations(1);
-    startBalancer(balancerConfiguration);
-
-    sleepWhileBalancing(500);
-
-    int number = percent * numberOfNodes / 100;
-    ContainerBalancerMetrics metrics = containerBalancer.getMetrics();
-    Assertions.assertFalse(
-        containerBalancer.getCountDatanodesInvolvedPerIteration() > number);
-    Assertions.assertTrue(
-        metrics.getNumDatanodesInvolvedInLatestIteration() > 0);
-    Assertions.assertFalse(
-        metrics.getNumDatanodesInvolvedInLatestIteration() > number);
-    stopBalancer();
-  }
+  public void testStartBalancerStop() throws Exception {
+    Mockito.when(replicationManager.move(Mockito.any(ContainerID.class),
+            Mockito.any(DatanodeDetails.class),
+            Mockito.any(DatanodeDetails.class)))
+        .thenReturn(genCompletableFuture(1000));
 
-  @Test
-  public void containerBalancerShouldSelectOnlyClosedContainers()
-      throws IllegalContainerBalancerStateException, IOException,
-      InvalidContainerBalancerConfigurationException, TimeoutException {
-    // make all containers open, balancer should not select any of them
-    for (ContainerInfo containerInfo : cidToInfoMap.values()) {
-      containerInfo.setState(HddsProtos.LifeCycleState.OPEN);
-    }
     balancerConfiguration.setThreshold(10);
-    startBalancer(balancerConfiguration);
-    sleepWhileBalancing(500);
-    stopBalancer();
-
-    // balancer should have identified unbalanced nodes
-    Assertions.assertFalse(containerBalancer.getUnBalancedNodes().isEmpty());
-    // no container should have been selected
-    Assertions.assertTrue(containerBalancer.getContainerToSourceMap()
-        .isEmpty());
-    /*
-    Iteration result should be CAN_NOT_BALANCE_ANY_MORE because no container
-    move is generated
-     */
-    Assertions.assertEquals(
-        ContainerBalancer.IterationResult.CAN_NOT_BALANCE_ANY_MORE,
-        containerBalancer.getIterationResult());
-
-    // now, close all containers
-    for (ContainerInfo containerInfo : cidToInfoMap.values()) {
-      containerInfo.setState(HddsProtos.LifeCycleState.CLOSED);
-    }
-    startBalancer(balancerConfiguration);
-    sleepWhileBalancing(500);
-    stopBalancer();
-
-    // check whether all selected containers are closed
-    for (ContainerID cid:
-         containerBalancer.getContainerToSourceMap().keySet()) {
-      Assertions.assertSame(
-          cidToInfoMap.get(cid).getState(), HddsProtos.LifeCycleState.CLOSED);
-    }
-  }
-
-  @Test
-  public void containerBalancerShouldObeyMaxSizeToMoveLimit()
-      throws IllegalContainerBalancerStateException, IOException,
-      InvalidContainerBalancerConfigurationException, TimeoutException {
-    balancerConfiguration.setThreshold(1);
-    balancerConfiguration.setMaxSizeToMovePerIteration(10 * STORAGE_UNIT);
     balancerConfiguration.setIterations(1);
-    startBalancer(balancerConfiguration);
-
-    sleepWhileBalancing(500);
-
-    // balancer should not have moved more size than the limit
-    Assertions.assertFalse(
-        containerBalancer.getSizeScheduledForMoveInLatestIteration() >
-        10 * STORAGE_UNIT);
-
-    long size = containerBalancer.getMetrics()
-        .getDataSizeMovedGBInLatestIteration();
-    Assertions.assertTrue(size > 0);
-    Assertions.assertFalse(size > 10);
-    stopBalancer();
-  }
-
-  @Test
-  public void targetDatanodeShouldNotAlreadyContainSelectedContainer()
-      throws IllegalContainerBalancerStateException, IOException,
-      InvalidContainerBalancerConfigurationException, TimeoutException {
-    balancerConfiguration.setThreshold(10);
+    balancerConfiguration.setMaxSizeEnteringTarget(10 * STORAGE_UNIT);
     balancerConfiguration.setMaxSizeToMovePerIteration(100 * STORAGE_UNIT);
     balancerConfiguration.setMaxDatanodesPercentageToInvolvePerIteration(100);
-    startBalancer(balancerConfiguration);
-
-    sleepWhileBalancing(1000);
-
-    stopBalancer();
-    Map<ContainerID, DatanodeDetails> map =
-        containerBalancer.getContainerToTargetMap();
-    for (Map.Entry<ContainerID, DatanodeDetails> entry : map.entrySet()) {
-      ContainerID container = entry.getKey();
-      DatanodeDetails target = entry.getValue();
-      Assertions.assertTrue(cidToReplicasMap.get(container)
-          .stream()
-          .map(ContainerReplica::getDatanodeDetails)
-          .noneMatch(target::equals));
-    }
-  }
+    balancerConfiguration.setMoveTimeout(Duration.ofMillis(1000));
 
-  @Test
-  public void containerMoveSelectionShouldFollowPlacementPolicy()
-      throws IllegalContainerBalancerStateException, IOException,
-      InvalidContainerBalancerConfigurationException, TimeoutException {
-    balancerConfiguration.setThreshold(10);
-    balancerConfiguration.setMaxSizeToMovePerIteration(50 * STORAGE_UNIT);
-    balancerConfiguration.setMaxDatanodesPercentageToInvolvePerIteration(100);
-    balancerConfiguration.setIterations(1);
     startBalancer(balancerConfiguration);
-
-    sleepWhileBalancing(500);
-
-    stopBalancer();
-    Map<ContainerID, DatanodeDetails> containerFromSourceMap =
-        containerBalancer.getContainerToSourceMap();
-    Map<ContainerID, DatanodeDetails> containerToTargetMap =
-        containerBalancer.getContainerToTargetMap();
-
-    // for each move selection, check if {replicas - source + target}
-    // satisfies placement policy
-    for (Map.Entry<ContainerID, DatanodeDetails> entry :
-        containerFromSourceMap.entrySet()) {
-      ContainerID container = entry.getKey();
-      DatanodeDetails source = entry.getValue();
-
-      List<DatanodeDetails> replicas = cidToReplicasMap.get(container)
-          .stream()
-          .map(ContainerReplica::getDatanodeDetails)
-          .collect(Collectors.toList());
-      // remove source and add target
-      replicas.remove(source);
-      replicas.add(containerToTargetMap.get(container));
-
-      ContainerInfo containerInfo = cidToInfoMap.get(container);
-      ContainerPlacementStatus placementStatus =
-          placementPolicy.validateContainerPlacement(replicas,
-              containerInfo.getReplicationConfig().getRequiredNodes());
-      Assertions.assertTrue(placementStatus.isPolicySatisfied());
-    }
-  }
-
-  @Test
-  public void targetDatanodeShouldBeInServiceHealthy()
-      throws NodeNotFoundException, IllegalContainerBalancerStateException,
-      IOException, InvalidContainerBalancerConfigurationException,
-      TimeoutException {
-    balancerConfiguration.setThreshold(10);
-    balancerConfiguration.setMaxDatanodesPercentageToInvolvePerIteration(100);
-    balancerConfiguration.setMaxSizeToMovePerIteration(50 * STORAGE_UNIT);
-    balancerConfiguration.setMaxSizeEnteringTarget(50 * STORAGE_UNIT);
-    balancerConfiguration.setIterations(1);
-    startBalancer(balancerConfiguration);
-
-    sleepWhileBalancing(500);
-
-    stopBalancer();
-    for (DatanodeDetails target : containerBalancer.getSelectedTargets()) {
-      NodeStatus status = mockNodeManager.getNodeStatus(target);
-      Assertions.assertSame(HddsProtos.NodeOperationalState.IN_SERVICE,
-          status.getOperationalState());
-      Assertions.assertTrue(status.isHealthy());
+    try {
+      containerBalancer.startBalancer(balancerConfiguration);
+      Assertions.assertTrue(false,
+          "Exception should be thrown when startBalancer again");
+    } catch (IllegalContainerBalancerStateException e) {
+      // start failed again, valid case
     }
-  }
-
-  @Test
-  public void selectedContainerShouldNotAlreadyHaveBeenSelected()
-      throws IllegalContainerBalancerStateException, IOException,
-      InvalidContainerBalancerConfigurationException, NodeNotFoundException,
-      TimeoutException {
-    balancerConfiguration.setThreshold(10);
-    balancerConfiguration.setMaxDatanodesPercentageToInvolvePerIteration(100);
-    balancerConfiguration.setMaxSizeToMovePerIteration(50 * STORAGE_UNIT);
-    balancerConfiguration.setMaxSizeEnteringTarget(50 * STORAGE_UNIT);
-    balancerConfiguration.setIterations(1);
-    startBalancer(balancerConfiguration);
-
-    sleepWhileBalancing(500);
-
-    stopBalancer();
-
-    int numContainers = containerBalancer.getContainerToTargetMap().size();
-
-    /*
-    Assuming move is called exactly once for each unique container, number of
-     calls to move should equal number of unique containers. If number of
-     calls to move is more than number of unique containers, at least one
-     container has been re-selected. It's expected that number of calls to
-     move should equal number of unique, selected containers (from
-     containerToTargetMap).
-     */
-    Mockito.verify(replicationManager, times(numContainers))
-        .move(any(ContainerID.class), any(DatanodeDetails.class),
-            any(DatanodeDetails.class));
-  }
-
-  @Test
-  public void balancerShouldNotSelectConfiguredExcludeContainers()
-      throws IllegalContainerBalancerStateException, IOException,
-      InvalidContainerBalancerConfigurationException, TimeoutException {
-    balancerConfiguration.setThreshold(10);
-    balancerConfiguration.setMaxDatanodesPercentageToInvolvePerIteration(100);
-    balancerConfiguration.setMaxSizeToMovePerIteration(50 * STORAGE_UNIT);
-    balancerConfiguration.setMaxSizeEnteringTarget(50 * STORAGE_UNIT);
-    balancerConfiguration.setExcludeContainers("1, 4, 5");
 
-    startBalancer(balancerConfiguration);
-
-    sleepWhileBalancing(500);
-
-    stopBalancer();
-    Set<ContainerID> excludeContainers =
-        balancerConfiguration.getExcludeContainers();
-    for (ContainerID container :
-        containerBalancer.getContainerToSourceMap().keySet()) {
-      Assertions.assertFalse(excludeContainers.contains(container));
+    try {
+      containerBalancer.start();
+      Assertions.assertTrue(false,
+          "Exception should be thrown when start again");
+    } catch (IllegalContainerBalancerStateException e) {
+      // start failed again, valid case
     }
-  }
-
-  @Test
-  public void balancerShouldObeyMaxSizeEnteringTargetLimit()
-      throws IllegalContainerBalancerStateException, IOException,
-      InvalidContainerBalancerConfigurationException, TimeoutException {
-    conf.set("ozone.scm.container.size", "1MB");
-    balancerConfiguration =
-        conf.getObject(ContainerBalancerConfiguration.class);
-    balancerConfiguration.setThreshold(10);
-    balancerConfiguration.setMaxDatanodesPercentageToInvolvePerIteration(100);
-    balancerConfiguration.setMaxSizeToMovePerIteration(50 * STORAGE_UNIT);
-
-    // no containers should be selected when the limit is just 2 MB
-    balancerConfiguration.setMaxSizeEnteringTarget(2 * OzoneConsts.MB);
-    startBalancer(balancerConfiguration);
-    sleepWhileBalancing(500);
-
-    Assertions.assertFalse(containerBalancer.getUnBalancedNodes().isEmpty());
-    Assertions.assertTrue(containerBalancer.getContainerToSourceMap()
-        .isEmpty());
-    stopBalancer();
-
-    // some containers should be selected when using default values
-    OzoneConfiguration ozoneConfiguration = new OzoneConfiguration();
-    ContainerBalancerConfiguration cbc = ozoneConfiguration.
-        getObject(ContainerBalancerConfiguration.class);
-    startBalancer(cbc);
-
-    sleepWhileBalancing(500);
-
-    stopBalancer();
-    // balancer should have identified unbalanced nodes
-    Assertions.assertFalse(containerBalancer.getUnBalancedNodes().isEmpty());
-    Assertions.assertFalse(containerBalancer.getContainerToSourceMap()
-        .isEmpty());
-  }
-
-  @Test
-  public void testMetrics()
-      throws IllegalContainerBalancerStateException, IOException,
-      InvalidContainerBalancerConfigurationException, TimeoutException {
-    conf.set("hdds.datanode.du.refresh.period", "1ms");
-    balancerConfiguration.setBalancingInterval(Duration.ofMillis(2));
-    balancerConfiguration.setThreshold(10);
-    balancerConfiguration.setIterations(1);
-    balancerConfiguration.setMaxSizeEnteringTarget(6 * STORAGE_UNIT);
-    // deliberately set max size per iteration to a low value, 6 GB
-    balancerConfiguration.setMaxSizeToMovePerIteration(6 * STORAGE_UNIT);
-    balancerConfiguration.setMaxDatanodesPercentageToInvolvePerIteration(100);
-
-    startBalancer(balancerConfiguration);
-    sleepWhileBalancing(500);
-    stopBalancer();
-
-    ContainerBalancerMetrics metrics = containerBalancer.getMetrics();
-    Assertions.assertEquals(determineExpectedUnBalancedNodes(
-            balancerConfiguration.getThreshold()).size(),
-        metrics.getNumDatanodesUnbalanced());
-    Assertions.assertTrue(metrics.getDataSizeMovedGBInLatestIteration() <= 6);
-    Assertions.assertEquals(1, metrics.getNumIterations());
-  }
-
-  /**
-   * Tests if {@link ContainerBalancer} follows the includeNodes and
-   * excludeNodes configurations in {@link ContainerBalancerConfiguration}.
-   * If the includeNodes configuration is not empty, only the specified
-   * includeNodes should be included in balancing. excludeNodes should be
-   * excluded from balancing. If a datanode is specified in both include and
-   * exclude configurations, then it should be excluded.
-   */
-  @Test
-  public void balancerShouldFollowExcludeAndIncludeDatanodesConfigurations()
-      throws IllegalContainerBalancerStateException, IOException,
-      InvalidContainerBalancerConfigurationException, TimeoutException {
-    balancerConfiguration.setThreshold(10);
-    balancerConfiguration.setIterations(1);
-    balancerConfiguration.setMaxSizeEnteringTarget(10 * STORAGE_UNIT);
-    balancerConfiguration.setMaxSizeToMovePerIteration(100 * STORAGE_UNIT);
-    balancerConfiguration.setMaxDatanodesPercentageToInvolvePerIteration(100);
 
-    // only these nodes should be included
-    // the ones also specified in excludeNodes should be excluded
-    int firstIncludeIndex = 0, secondIncludeIndex = 1;
-    int thirdIncludeIndex = nodesInCluster.size() - 2;
-    int fourthIncludeIndex = nodesInCluster.size() - 1;
-    String includeNodes =
-        nodesInCluster.get(firstIncludeIndex).getDatanodeDetails()
-            .getIpAddress() + ", " +
-            nodesInCluster.get(secondIncludeIndex).getDatanodeDetails()
-                .getIpAddress() + ", " +
-            nodesInCluster.get(thirdIncludeIndex).getDatanodeDetails()
-                .getHostName() + ", " +
-            nodesInCluster.get(fourthIncludeIndex).getDatanodeDetails()
-                .getHostName();
+    Assertions.assertTrue(containerBalancer.getBalancerStatus()
+        == ContainerBalancerTask.Status.RUNNING);
 
-    // these nodes should be excluded
-    int firstExcludeIndex = 0, secondExcludeIndex = nodesInCluster.size() - 1;
-    String excludeNodes =
-        nodesInCluster.get(firstExcludeIndex).getDatanodeDetails()
-            .getIpAddress() + ", " +
-            nodesInCluster.get(secondExcludeIndex).getDatanodeDetails()
-                .getHostName();
-
-    balancerConfiguration.setExcludeNodes(excludeNodes);
-    balancerConfiguration.setIncludeNodes(includeNodes);
-    startBalancer(balancerConfiguration);
-    sleepWhileBalancing(500);
     stopBalancer();
+    Assertions.assertTrue(containerBalancer.getBalancerStatus()
+        == ContainerBalancerTask.Status.STOPPED);
 
-    // finally, these should be the only nodes included in balancing
-    // (included - excluded)
-    DatanodeDetails dn1 =
-        nodesInCluster.get(secondIncludeIndex).getDatanodeDetails();
-    DatanodeDetails dn2 =
-        nodesInCluster.get(thirdIncludeIndex).getDatanodeDetails();
-    Map<ContainerID, DatanodeDetails> containerFromSourceMap =
-        containerBalancer.getContainerToSourceMap();
-    Map<ContainerID, DatanodeDetails> containerToTargetMap =
-        containerBalancer.getContainerToTargetMap();
-    for (Map.Entry<ContainerID, DatanodeDetails> entry :
-        containerFromSourceMap.entrySet()) {
-      DatanodeDetails source = entry.getValue();
-      DatanodeDetails target = containerToTargetMap.get(entry.getKey());
-      Assertions.assertTrue(source.equals(dn1) || source.equals(dn2));
-      Assertions.assertTrue(target.equals(dn1) || target.equals(dn2));
+    try {
+      containerBalancer.stopBalancer();
+      Assertions.assertTrue(false,
+          "Exception should be thrown when stop again");
+    } catch (Exception e) {
+      // stop failed as already stopped, valid case
     }
   }
 
   @Test
-  public void testContainerBalancerConfiguration() {
-    OzoneConfiguration ozoneConfiguration = new OzoneConfiguration();
-    ozoneConfiguration.set("ozone.scm.container.size", "5GB");
-    ozoneConfiguration.setDouble(
-        "hdds.container.balancer.utilization.threshold", 1);
-
-    ContainerBalancerConfiguration cbConf =
-        ozoneConfiguration.getObject(ContainerBalancerConfiguration.class);
-    Assertions.assertEquals(1, cbConf.getThreshold(), 0.001);
-
-    Assertions.assertEquals(26 * 1024 * 1024 * 1024L,
-        cbConf.getMaxSizeLeavingSource());
-
-    Assertions.assertEquals(30 * 60 * 1000,
-        cbConf.getMoveTimeout().toMillis());
-  }
-
-  @Test
-  public void checkIterationResult()
-      throws NodeNotFoundException, IOException,
-      IllegalContainerBalancerStateException,
-      InvalidContainerBalancerConfigurationException,
-      TimeoutException {
-    balancerConfiguration.setThreshold(10);
-    balancerConfiguration.setIterations(1);
-    balancerConfiguration.setMaxSizeEnteringTarget(10 * STORAGE_UNIT);
-    balancerConfiguration.setMaxSizeToMovePerIteration(100 * STORAGE_UNIT);
-    balancerConfiguration.setMaxDatanodesPercentageToInvolvePerIteration(100);
-
-    startBalancer(balancerConfiguration);
-    sleepWhileBalancing(1000);
-
-    /*
-    According to the setup and configurations, this iteration's result should
-    be ITERATION_COMPLETED.
-     */
-    Assertions.assertEquals(
-        ContainerBalancer.IterationResult.ITERATION_COMPLETED,
-        containerBalancer.getIterationResult());
-    stopBalancer();
-
-    /*
-    Now, limit maxSizeToMovePerIteration but fail all container moves. The
-    result should still be ITERATION_COMPLETED.
-     */
-    Mockito.when(replicationManager.move(Mockito.any(ContainerID.class),
-            Mockito.any(DatanodeDetails.class),
-            Mockito.any(DatanodeDetails.class)))
-        .thenReturn(CompletableFuture.completedFuture(
-            MoveResult.REPLICATION_FAIL_NODE_UNHEALTHY));
-    balancerConfiguration.setMaxSizeToMovePerIteration(10 * STORAGE_UNIT);
-
-    startBalancer(balancerConfiguration);
-    sleepWhileBalancing(1000);
-
-    Assertions.assertEquals(
-        ContainerBalancer.IterationResult.ITERATION_COMPLETED,
-        containerBalancer.getIterationResult());
-    stopBalancer();
-  }
-
-  @Test
-  public void checkIterationResultTimeout()
-      throws NodeNotFoundException, IOException,
-      IllegalContainerBalancerStateException,
-      InvalidContainerBalancerConfigurationException,
-      TimeoutException {
-
+  public void testStartStop() throws Exception {

Review Comment:
   Maybe we can change this test name to something more appropriate?



##########
hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancer.java:
##########
@@ -212,559 +206,106 @@ public void setup() throws IOException, NodeNotFoundException,
   }
 
   @Test
-  public void testCalculationOfUtilization() {
-    Assertions.assertEquals(nodesInCluster.size(), nodeUtilizations.size());
-    for (int i = 0; i < nodesInCluster.size(); i++) {
-      Assertions.assertEquals(nodeUtilizations.get(i),
-          nodesInCluster.get(i).calculateUtilization(), 0.0001);
-    }
-
+  public void testShouldRun() throws Exception {
+    boolean doRun = containerBalancer.shouldRun();
     // should be equal to average utilization of the cluster
-    Assertions.assertEquals(averageUtilization,
-        containerBalancer.calculateAvgUtilization(nodesInCluster), 0.0001);
-  }
-
-  /**
-   * Checks whether ContainerBalancer is correctly updating the list of
-   * unBalanced nodes with varying values of Threshold.
-   */
-  @Test
-  public void
-      initializeIterationShouldUpdateUnBalancedNodesWhenThresholdChanges()
-      throws IllegalContainerBalancerStateException, IOException,
-      InvalidContainerBalancerConfigurationException, TimeoutException {
-    List<DatanodeUsageInfo> expectedUnBalancedNodes;
-    List<DatanodeUsageInfo> unBalancedNodesAccordingToBalancer;
-
-    // check for random threshold values
-    for (int i = 0; i < 50; i++) {
-      double randomThreshold = RANDOM.nextDouble() * 100;
-
-      balancerConfiguration.setThreshold(randomThreshold);
-      startBalancer(balancerConfiguration);
-
-      // waiting for balance completed.
-      // TODO: this is a temporary implementation for now
-      // modify this after balancer is fully completed
-      try {
-        Thread.sleep(100);
-      } catch (InterruptedException e) { }
-
-      expectedUnBalancedNodes =
-          determineExpectedUnBalancedNodes(randomThreshold);
-      unBalancedNodesAccordingToBalancer =
-          containerBalancer.getUnBalancedNodes();
-
-      stopBalancer();
-      Assertions.assertEquals(
-          expectedUnBalancedNodes.size(),
-          unBalancedNodesAccordingToBalancer.size());
-
-      for (int j = 0; j < expectedUnBalancedNodes.size(); j++) {
-        Assertions.assertEquals(
-            expectedUnBalancedNodes.get(j).getDatanodeDetails(),
-            unBalancedNodesAccordingToBalancer.get(j).getDatanodeDetails());
-      }
-    }
+    Assertions.assertEquals(doRun, false);
+    containerBalancer.saveConfiguration(balancerConfiguration, true, 0);
+    doRun = containerBalancer.shouldRun();
+    Assertions.assertEquals(doRun, true);
+    containerBalancer.saveConfiguration(balancerConfiguration, false, 0);
+    doRun = containerBalancer.shouldRun();
+    Assertions.assertEquals(doRun, false);

Review Comment:
   ```suggestion
       Assertions.assertFalse(doRun);
   ```



##########
hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancer.java:
##########
@@ -212,559 +206,106 @@ public void setup() throws IOException, NodeNotFoundException,
   }
 
   @Test
-  public void testCalculationOfUtilization() {
-    Assertions.assertEquals(nodesInCluster.size(), nodeUtilizations.size());
-    for (int i = 0; i < nodesInCluster.size(); i++) {
-      Assertions.assertEquals(nodeUtilizations.get(i),
-          nodesInCluster.get(i).calculateUtilization(), 0.0001);
-    }
-
+  public void testShouldRun() throws Exception {
+    boolean doRun = containerBalancer.shouldRun();
     // should be equal to average utilization of the cluster

Review Comment:
   This comment is no longer valid



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] sumitagrawl commented on a diff in pull request #3782: HDDS-7214. Continuous start & stop can have hanging threads in stopping

Posted by GitBox <gi...@apache.org>.
sumitagrawl commented on code in PR #3782:
URL: https://github.com/apache/ozone/pull/3782#discussion_r990393214


##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java:
##########
@@ -892,34 +79,38 @@ private void resetState() {
    */
   @Override
   public void notifyStatusChanged() {
-    boolean shouldStop = false;
-    boolean shouldRun = false;
+    if (!scmContext.isLeader() || scmContext.isInSafeMode()) {
+      boolean shouldStop;
+      lock.lock();
+      try {
+        shouldStop = canBalancerStop();
+      } finally {
+        lock.unlock();
+      }
+      if (shouldStop) {
+        LOG.info("Stopping ContainerBalancer in this scm on status change");
+        stop();
+      }
+      return;
+    }
+
     lock.lock();
     try {
-      if (!scmContext.isLeader() || scmContext.isInSafeMode()) {
-        shouldStop = isBalancerRunning();
-      } else {
-        shouldRun = shouldRun();
+      // else check for start
+      boolean shouldRun = shouldRun();
+      if (shouldRun && canBalancerStart()) {

Review Comment:
   Logic is same as old, just additional memory check of balancer.
   1. check for if balancer can run by checking persistent
   2. then check if already running in memory
   And then decide if can start.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] sumitagrawl commented on a diff in pull request #3782: HDDS-7214. Continuous start & stop can have hanging threads in stopping

Posted by GitBox <gi...@apache.org>.
sumitagrawl commented on code in PR #3782:
URL: https://github.com/apache/ozone/pull/3782#discussion_r988739541


##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java:
##########
@@ -893,32 +80,36 @@ private void resetState() {
   @Override
   public void notifyStatusChanged() {
     boolean shouldStop = false;
-    boolean shouldRun = false;
     lock.lock();
     try {
       if (!scmContext.isLeader() || scmContext.isInSafeMode()) {
-        shouldStop = isBalancerRunning();
-      } else {
-        shouldRun = shouldRun();
+        shouldStop = canBalancerStop();
       }
     } finally {
       lock.unlock();
     }
-
     if (shouldStop) {
       LOG.info("Stopping ContainerBalancer in this scm on status change");
       stop();
+      return;
     }
 
-    if (shouldRun) {
-      LOG.info("Starting ContainerBalancer in this scm on status change");
-      try {
-        start();
-      } catch (IllegalContainerBalancerStateException |
-          InvalidContainerBalancerConfigurationException e) {
-        LOG.warn("Could not start ContainerBalancer on raft/safe-mode " +
-            "status change.", e);
+    lock.lock();

Review Comment:
   > It is a bit inconsistent that we need to take lock for start but not for stop call above.
   
   This is done as stop has a blocking call with join, waiting thread to stop, this may take longer time. Stop lock is done inside at granular level, avoiding area of join.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] sumitagrawl commented on a diff in pull request #3782: HDDS-7214. Continuous start & stop can have hanging threads in stopping

Posted by GitBox <gi...@apache.org>.
sumitagrawl commented on code in PR #3782:
URL: https://github.com/apache/ozone/pull/3782#discussion_r988740996


##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java:
##########
@@ -1137,45 +351,21 @@ public void stopBalancer()
       TimeoutException {
     lock.lock();
     try {
-      // should be leader, out of safe mode, and currently running
       validateState(true);
-      saveConfiguration(config, false, 0);

Review Comment:
   > saveConfiguration could end up happening outside lock if executed in the ContainerBalancerTask.
   
   This is synchronized based on running status control,
   
   1. saveConfiguration is called when ContainerBalancerTask is not running during start of task from CLI (as applicable for only CLI to mark).
   2. ContainerBalancerTask call saveContainer during Stopping state, when task is about to close / stop.
   
   Considering above sync, lock is avoided from ContainerBalancerTask.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org