You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@ozone.apache.org by GitBox <gi...@apache.org> on 2021/05/12 09:20:04 UTC

[GitHub] [ozone] lokeshj1703 commented on a change in pull request #2230: HDDS-4927. Add support for initializing an iteration in ContainerBalancer. Add unit tests.

lokeshj1703 commented on a change in pull request #2230:
URL: https://github.com/apache/ozone/pull/2230#discussion_r630786402



##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
##########
@@ -59,74 +80,321 @@ public ContainerBalancer(
     this.ozoneConfiguration = ozoneConfiguration;
     this.balancerRunning = false;
     this.config = new ContainerBalancerConfiguration();
+    this.metrics = new ContainerBalancerMetrics();
   }
 
   /**
-   * Start ContainerBalancer. Current implementation is incomplete.
+   * Starts ContainerBalancer. Current implementation is incomplete.
    *
    * @param balancerConfiguration Configuration values.
    */
   public void start(ContainerBalancerConfiguration balancerConfiguration) {
+    if (balancerRunning) {
+      LOG.info("Container Balancer is already running.");
+      throw new RuntimeException();
+    }
     this.balancerRunning = true;
-
     ozoneConfiguration = new OzoneConfiguration();
 
-    // initialise configs
     this.config = balancerConfiguration;
     this.threshold = config.getThreshold();
-    this.maxDatanodesToBalance =
-        config.getMaxDatanodesToBalance();
+    this.maxDatanodesToBalance = config.getMaxDatanodesToBalance();
     this.maxSizeToMove = config.getMaxSizeToMove();
 
+    this.clusterCapacity = 0L;
+    this.clusterUsed = 0L;
+    this.clusterRemaining = 0L;
+
+    this.overUtilizedNodes = new ArrayList<>();
+    this.underUtilizedNodes = new ArrayList<>();
+    this.aboveAverageUtilizedNodes = new ArrayList<>();
+    this.belowAverageUtilizedNodes = new ArrayList<>();
+    this.sourceNodes = new ArrayList<>();
+
     LOG.info("Starting Container Balancer...");
+    LOG.info(toString());
+
+    balance();
+  }
 
-    // sorted list in order from most to least used
-    List<DatanodeUsageInfo> nodes = nodeManager.
-        getMostOrLeastUsedDatanodes(true);
-    double avgUtilisation = calculateAvgUtilisation(nodes);
+  /**
+   * Balances the cluster.
+   */
+  private void balance() {
+    overUtilizedNodes.clear();
+    underUtilizedNodes.clear();
+    aboveAverageUtilizedNodes.clear();
+    belowAverageUtilizedNodes.clear();
+    initializeIteration();
+  }
+
+  /**
+   * Initializes an iteration during balancing. Recognizes over, under,
+   * below-average,and under-average utilizes nodes. Decides whether
+   * balancing needs to continue or should be stopped.
+   *
+   * @return true if successfully initialized, otherwise false.
+   */
+  private boolean initializeIteration() {

Review comment:
       Can we divide this function into further smaller functions? Like calculateAverageUtilisation, identifyOverAndUnderUtilizedNodes etc. ? We can avoid a large function then.

##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
##########
@@ -59,74 +80,321 @@ public ContainerBalancer(
     this.ozoneConfiguration = ozoneConfiguration;
     this.balancerRunning = false;
     this.config = new ContainerBalancerConfiguration();
+    this.metrics = new ContainerBalancerMetrics();
   }
 
   /**
-   * Start ContainerBalancer. Current implementation is incomplete.
+   * Starts ContainerBalancer. Current implementation is incomplete.
    *
    * @param balancerConfiguration Configuration values.
    */
   public void start(ContainerBalancerConfiguration balancerConfiguration) {
+    if (balancerRunning) {
+      LOG.info("Container Balancer is already running.");
+      throw new RuntimeException();
+    }
     this.balancerRunning = true;
-
     ozoneConfiguration = new OzoneConfiguration();
 
-    // initialise configs
     this.config = balancerConfiguration;
     this.threshold = config.getThreshold();
-    this.maxDatanodesToBalance =
-        config.getMaxDatanodesToBalance();
+    this.maxDatanodesToBalance = config.getMaxDatanodesToBalance();
     this.maxSizeToMove = config.getMaxSizeToMove();
 
+    this.clusterCapacity = 0L;
+    this.clusterUsed = 0L;
+    this.clusterRemaining = 0L;
+
+    this.overUtilizedNodes = new ArrayList<>();
+    this.underUtilizedNodes = new ArrayList<>();
+    this.aboveAverageUtilizedNodes = new ArrayList<>();
+    this.belowAverageUtilizedNodes = new ArrayList<>();
+    this.sourceNodes = new ArrayList<>();
+
     LOG.info("Starting Container Balancer...");
+    LOG.info(toString());
+
+    balance();
+  }
 
-    // sorted list in order from most to least used
-    List<DatanodeUsageInfo> nodes = nodeManager.
-        getMostOrLeastUsedDatanodes(true);
-    double avgUtilisation = calculateAvgUtilisation(nodes);
+  /**
+   * Balances the cluster.
+   */
+  private void balance() {
+    overUtilizedNodes.clear();
+    underUtilizedNodes.clear();
+    aboveAverageUtilizedNodes.clear();
+    belowAverageUtilizedNodes.clear();
+    initializeIteration();
+  }
+
+  /**
+   * Initializes an iteration during balancing. Recognizes over, under,
+   * below-average,and under-average utilizes nodes. Decides whether
+   * balancing needs to continue or should be stopped.
+   *
+   * @return true if successfully initialized, otherwise false.
+   */
+  private boolean initializeIteration() {
+    List<DatanodeUsageInfo> nodes;
+    try {
+      // sorted list in order from most to least used
+      nodes = nodeManager.getMostOrLeastUsedDatanodes(true);
+    } catch (NullPointerException e) {
+      LOG.error("Container Balancer could not retrieve nodes from Node " +
+          "Manager.", e);
+      stop();
+      return false;
+    }
+
+    try {
+      clusterAvgUtilisation = calculateAvgUtilization(nodes);
+    } catch(ArithmeticException e) {
+      LOG.warn("Container Balancer failed to initialize an iteration", e);
+      return false;
+    }
+    LOG.info("Average utilization of the cluster is {}", clusterAvgUtilisation);
 
     // under utilized nodes have utilization(that is, used / capacity) less
     // than lower limit
-    double lowerLimit = avgUtilisation - threshold;
+    double lowerLimit = clusterAvgUtilisation - threshold;
 
     // over utilized nodes have utilization(that is, used / capacity) greater
     // than upper limit
-    double upperLimit = avgUtilisation + threshold;
+    double upperLimit = clusterAvgUtilisation + threshold;
+
     LOG.info("Lower limit for utilization is {}", lowerLimit);
     LOG.info("Upper limit for utilization is {}", upperLimit);
 
-    // find over utilised(source) and under utilised(target) nodes
-    sourceNodes = new ArrayList<>();
-    targetNodes = new ArrayList<>();
-//    for (DatanodeUsageInfo node : nodes) {
-//      SCMNodeStat stat = node.getScmNodeStat();
-//      double utilization = stat.getScmUsed().get().doubleValue() /
-//          stat.getCapacity().get().doubleValue();
-//      if (utilization > upperLimit) {
-//        sourceNodes.add(node);
-//      } else if (utilization < lowerLimit || utilization < avgUtilisation) {
-//        targetNodes.add(node);
-//      }
-//    }
-  }
-
-  // calculate the average datanode utilisation across the cluster
-  private double calculateAvgUtilisation(List<DatanodeUsageInfo> nodes) {
+    long numDatanodesToBalance = 0L;
+    double overLoadedBytes = 0D, underLoadedBytes = 0D;
+
+    // find over and under utilized nodes
+    for (DatanodeUsageInfo node : nodes) {
+      double utilization = calculateUtilization(node);
+      if (utilization > clusterAvgUtilisation) {
+        if (utilization > upperLimit) {
+          overUtilizedNodes.add(node);
+          numDatanodesToBalance += 1;
+
+          // amount of bytes greater than upper limit in this node
+          overLoadedBytes +=
+              ratioToBytes(node.getScmNodeStat().getCapacity().get(),
+                  utilization) -
+                  ratioToBytes(node.getScmNodeStat().getCapacity().get(),
+                      upperLimit);
+        } else {
+          aboveAverageUtilizedNodes.add(node);
+        }
+      } else if (utilization < clusterAvgUtilisation) {
+        if (utilization < lowerLimit) {
+          underUtilizedNodes.add(node);
+          numDatanodesToBalance += 1;
+
+          // amount of bytes lesser than lower limit in this node
+          underLoadedBytes +=
+              ratioToBytes(node.getScmNodeStat().getCapacity().get(),
+                  lowerLimit) -
+                  ratioToBytes(node.getScmNodeStat().getCapacity().get(),
+                      utilization);
+        } else {
+          belowAverageUtilizedNodes.add(node);
+        }
+      }
+    }
+
+    Collections.reverse(underUtilizedNodes);
+    Collections.reverse(belowAverageUtilizedNodes);
+
+    long numDatanodesBalanced = 0;
+    // count number of nodes that were balanced in previous iteration
+    for (DatanodeUsageInfo node : sourceNodes) {
+      if (!containsNode(overUtilizedNodes, node) &&
+          !containsNode(underUtilizedNodes, node)) {
+        numDatanodesBalanced += 1;
+      }
+    }
+
+    // calculate total number of nodes that have been balanced
+    numDatanodesBalanced =
+        numDatanodesBalanced + metrics.getNumDatanodesBalanced().get();

Review comment:
       We can add another method in metrics to increment a metric then we do not need to fetch metric value.

##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
##########
@@ -59,74 +80,321 @@ public ContainerBalancer(
     this.ozoneConfiguration = ozoneConfiguration;
     this.balancerRunning = false;
     this.config = new ContainerBalancerConfiguration();
+    this.metrics = new ContainerBalancerMetrics();
   }
 
   /**
-   * Start ContainerBalancer. Current implementation is incomplete.
+   * Starts ContainerBalancer. Current implementation is incomplete.
    *
    * @param balancerConfiguration Configuration values.
    */
   public void start(ContainerBalancerConfiguration balancerConfiguration) {
+    if (balancerRunning) {
+      LOG.info("Container Balancer is already running.");
+      throw new RuntimeException();
+    }
     this.balancerRunning = true;
-
     ozoneConfiguration = new OzoneConfiguration();
 
-    // initialise configs
     this.config = balancerConfiguration;
     this.threshold = config.getThreshold();
-    this.maxDatanodesToBalance =
-        config.getMaxDatanodesToBalance();
+    this.maxDatanodesToBalance = config.getMaxDatanodesToBalance();
     this.maxSizeToMove = config.getMaxSizeToMove();
 
+    this.clusterCapacity = 0L;
+    this.clusterUsed = 0L;
+    this.clusterRemaining = 0L;
+
+    this.overUtilizedNodes = new ArrayList<>();
+    this.underUtilizedNodes = new ArrayList<>();
+    this.aboveAverageUtilizedNodes = new ArrayList<>();
+    this.belowAverageUtilizedNodes = new ArrayList<>();
+    this.sourceNodes = new ArrayList<>();
+
     LOG.info("Starting Container Balancer...");
+    LOG.info(toString());
+
+    balance();
+  }
 
-    // sorted list in order from most to least used
-    List<DatanodeUsageInfo> nodes = nodeManager.
-        getMostOrLeastUsedDatanodes(true);
-    double avgUtilisation = calculateAvgUtilisation(nodes);
+  /**
+   * Balances the cluster.
+   */
+  private void balance() {
+    overUtilizedNodes.clear();
+    underUtilizedNodes.clear();
+    aboveAverageUtilizedNodes.clear();
+    belowAverageUtilizedNodes.clear();
+    initializeIteration();
+  }
+
+  /**
+   * Initializes an iteration during balancing. Recognizes over, under,
+   * below-average,and under-average utilizes nodes. Decides whether
+   * balancing needs to continue or should be stopped.
+   *
+   * @return true if successfully initialized, otherwise false.
+   */
+  private boolean initializeIteration() {
+    List<DatanodeUsageInfo> nodes;
+    try {
+      // sorted list in order from most to least used
+      nodes = nodeManager.getMostOrLeastUsedDatanodes(true);
+    } catch (NullPointerException e) {
+      LOG.error("Container Balancer could not retrieve nodes from Node " +
+          "Manager.", e);
+      stop();
+      return false;
+    }
+
+    try {
+      clusterAvgUtilisation = calculateAvgUtilization(nodes);

Review comment:
       I think we will also need to handle safe mode. Balancer should not operate when SCM is in safe mode.

##########
File path: hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancer.java
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.container.balancer;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.hdds.scm.container.ContainerManagerV2;
+import org.apache.hadoop.hdds.scm.container.MockNodeManager;
+import org.apache.hadoop.hdds.scm.container.ReplicationManager;
+import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
+import org.apache.hadoop.hdds.scm.node.DatanodeUsageInfo;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+public class TestContainerBalancer {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestContainerBalancer.class);
+
+  private ReplicationManager replicationManager;
+  private ContainerManagerV2 containerManager;
+  private ContainerBalancer containerBalancer;
+  private MockNodeManager mockNodeManager;
+  private OzoneConfiguration conf;
+  private ContainerBalancerConfiguration balancerConfiguration;
+  private List<DatanodeUsageInfo> nodesInCluster;
+  private List<Double> nodeUtilizations;
+  private double averageUtilization;
+  private int numberOfNodes;
+
+  /**
+   * Sets up configuration values and creates a mock cluster.
+   */
+  @Before
+  public void setup() {
+    conf = new OzoneConfiguration();
+    containerManager = Mockito.mock(ContainerManagerV2.class);
+    replicationManager = Mockito.mock(ReplicationManager.class);
+
+    balancerConfiguration = new ContainerBalancerConfiguration();
+    balancerConfiguration.setThreshold("0.1");
+    balancerConfiguration.setMaxDatanodesToBalance(10);
+    balancerConfiguration.setMaxSizeToMove(500L);
+    conf.setFromObject(balancerConfiguration);
+
+    this.numberOfNodes = 10;
+    generateUtilizations(numberOfNodes);

Review comment:
       NIT. We can move generateUtilizations function inside createNodesInCluster.

##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
##########
@@ -45,9 +48,27 @@
   private long maxSizeToMove;
   private boolean balancerRunning;
   private List<DatanodeUsageInfo> sourceNodes;
-  private List<DatanodeUsageInfo> targetNodes;
+  private List<DatanodeUsageInfo> overUtilizedNodes;
+  private List<DatanodeUsageInfo> underUtilizedNodes;
+  private List<DatanodeUsageInfo> aboveAverageUtilizedNodes;
+  private List<DatanodeUsageInfo> belowAverageUtilizedNodes;

Review comment:
       Can we not have these lists? I think it might be simpler to maintain another list or set of nodes which are not categorised as either over or under utilized.

##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerConfiguration.java
##########
@@ -45,58 +50,75 @@
   private int maxDatanodesToBalance = 5;
 
   @Config(key = "size.moved.max", type = ConfigType.LONG,
-      defaultValue = "10737418240L", tags = {ConfigTag.BALANCER},
-      description = "The maximum size of data in Bytes that will be moved " +
-          "by the Container Balancer.")
-  private long maxSizeToMove = 10737418240L;
+      defaultValue = "10", tags = {ConfigTag.BALANCER},
+      description = "The maximum size of data in GB that will be moved " +
+          "by Container Balancer.")
+  private long maxSizeToMove = 10;
 
   /**
-   * Get the threshold value for Container Balancer.
+   * Gets the threshold value for Container Balancer.
+   *
    * @return a fraction in the range 0 to 1
    */
   public double getThreshold() {
-    return threshold;
+    return Double.parseDouble(threshold);
   }
 
   /**
-   * Set the threshold value for Container Balancer.
+   * Sets the threshold value for Container Balancer.
+   *
    * @param threshold a fraction in the range 0 to 1
    */
-  public void setThreshold(double threshold) {
+  public void setThreshold(String threshold) {

Review comment:
       It can take double as argument.

##########
File path: hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
##########
@@ -139,6 +137,37 @@ public MockNodeManager(boolean initializeFakeNodes, int nodeCount) {
         initializeFakeNodes, nodeCount);
   }
 
+  public MockNodeManager(List<DatanodeUsageInfo> nodes)

Review comment:
       The common functionality with the other constructor can be extracted to a separate function.

##########
File path: hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancer.java
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.container.balancer;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.hdds.scm.container.ContainerManagerV2;
+import org.apache.hadoop.hdds.scm.container.MockNodeManager;
+import org.apache.hadoop.hdds.scm.container.ReplicationManager;
+import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
+import org.apache.hadoop.hdds.scm.node.DatanodeUsageInfo;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+public class TestContainerBalancer {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestContainerBalancer.class);
+
+  private ReplicationManager replicationManager;
+  private ContainerManagerV2 containerManager;
+  private ContainerBalancer containerBalancer;
+  private MockNodeManager mockNodeManager;
+  private OzoneConfiguration conf;
+  private ContainerBalancerConfiguration balancerConfiguration;
+  private List<DatanodeUsageInfo> nodesInCluster;
+  private List<Double> nodeUtilizations;
+  private double averageUtilization;
+  private int numberOfNodes;
+
+  /**
+   * Sets up configuration values and creates a mock cluster.
+   */
+  @Before
+  public void setup() {
+    conf = new OzoneConfiguration();
+    containerManager = Mockito.mock(ContainerManagerV2.class);
+    replicationManager = Mockito.mock(ReplicationManager.class);
+
+    balancerConfiguration = new ContainerBalancerConfiguration();
+    balancerConfiguration.setThreshold("0.1");
+    balancerConfiguration.setMaxDatanodesToBalance(10);
+    balancerConfiguration.setMaxSizeToMove(500L);
+    conf.setFromObject(balancerConfiguration);
+
+    this.numberOfNodes = 10;
+    generateUtilizations(numberOfNodes);
+
+    // create datanodes with the generated nodeUtilization values
+    this.averageUtilization = createNodesInCluster();
+    mockNodeManager = new MockNodeManager(nodesInCluster);
+    containerBalancer = new ContainerBalancer(mockNodeManager, containerManager,
+        replicationManager, conf);
+  }
+
+  /**
+   * Checks whether ContainerBalancer is correctly updating the list of source
+   * nodes with varying values of Threshold.
+   */
+  @Test
+  public void initializeIterationShouldUpdateSourceNodesWhenThresholdChanges() {
+    List<DatanodeUsageInfo> expectedSourceNodes;
+    List<DatanodeUsageInfo> sourceNodesAccordingToBalancer;
+
+    // check for random threshold values
+    for (int i = 0; i < 50; i++) {
+      double randomThreshold = Math.random();
+
+      balancerConfiguration.setThreshold(String.valueOf(randomThreshold));
+      containerBalancer.start(balancerConfiguration);
+      expectedSourceNodes = determineExpectedSourceNodes(randomThreshold);
+      sourceNodesAccordingToBalancer = containerBalancer.getSourceNodes();
+
+      Assert.assertEquals(
+          expectedSourceNodes.size(), sourceNodesAccordingToBalancer.size());
+
+      for (int j = 0; j < expectedSourceNodes.size(); j++) {
+        Assert.assertEquals(expectedSourceNodes.get(j).getDatanodeDetails(),
+            sourceNodesAccordingToBalancer.get(j).getDatanodeDetails());
+      }
+      containerBalancer.stop();
+    }
+
+  }
+
+  /**
+   * Checks whether the list of source is empty when the cluster is balanced.
+   */
+  @Test
+  public void sourceNodesListShouldBeEmptyWhenClusterIsBalanced() {
+    balancerConfiguration.setThreshold("0.99");
+    containerBalancer.start(balancerConfiguration);
+
+    Assert.assertEquals(0, containerBalancer.getSourceNodes().size());
+    containerBalancer.stop();
+  }
+
+  /**
+   * Checks whether ContainerBalancer stops when the limit of
+   * MaxDatanodesToBalance is reached.
+   */
+  @Test
+  public void containerBalancerShouldStopWhenMaxDatanodesToBalanceIsReached() {
+    balancerConfiguration.setMaxDatanodesToBalance(2);
+    balancerConfiguration.setThreshold("0");
+    containerBalancer.start(balancerConfiguration);
+
+    Assert.assertFalse(containerBalancer.isBalancerRunning());
+    containerBalancer.stop();
+  }
+
+  /**
+   * Determines source nodes, that is, over and under utilized nodes,
+   * according to the generated utilization values for nodes and the threshold.
+   *
+   * @param threshold A fraction from range 0 to 1.
+   * @return List of DatanodeUsageInfo containing the expected(correct)
+   * source nodes.
+   */
+  private List<DatanodeUsageInfo> determineExpectedSourceNodes(
+      double threshold) {
+    double lowerLimit = averageUtilization - threshold;
+    double upperLimit = averageUtilization + threshold;
+
+    // use node utilizations to determine over and under utilized nodes
+    List<DatanodeUsageInfo> expectedSourceNodes = new ArrayList<>();
+    for (int i = 0; i < numberOfNodes; i++) {
+      if (nodeUtilizations.get(numberOfNodes - i - 1) > upperLimit) {

Review comment:
       Can we use nodesInCluster directly to determine the source nodes?

##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerConfiguration.java
##########
@@ -45,58 +50,75 @@
   private int maxDatanodesToBalance = 5;
 
   @Config(key = "size.moved.max", type = ConfigType.LONG,

Review comment:
       We can use ConfigType.SIZE here.

##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
##########
@@ -59,74 +80,321 @@ public ContainerBalancer(
     this.ozoneConfiguration = ozoneConfiguration;
     this.balancerRunning = false;
     this.config = new ContainerBalancerConfiguration();
+    this.metrics = new ContainerBalancerMetrics();
   }
 
   /**
-   * Start ContainerBalancer. Current implementation is incomplete.
+   * Starts ContainerBalancer. Current implementation is incomplete.
    *
    * @param balancerConfiguration Configuration values.
    */
   public void start(ContainerBalancerConfiguration balancerConfiguration) {
+    if (balancerRunning) {
+      LOG.info("Container Balancer is already running.");
+      throw new RuntimeException();
+    }
     this.balancerRunning = true;
-
     ozoneConfiguration = new OzoneConfiguration();
 
-    // initialise configs
     this.config = balancerConfiguration;
     this.threshold = config.getThreshold();
-    this.maxDatanodesToBalance =
-        config.getMaxDatanodesToBalance();
+    this.maxDatanodesToBalance = config.getMaxDatanodesToBalance();
     this.maxSizeToMove = config.getMaxSizeToMove();
 
+    this.clusterCapacity = 0L;
+    this.clusterUsed = 0L;
+    this.clusterRemaining = 0L;
+
+    this.overUtilizedNodes = new ArrayList<>();
+    this.underUtilizedNodes = new ArrayList<>();
+    this.aboveAverageUtilizedNodes = new ArrayList<>();
+    this.belowAverageUtilizedNodes = new ArrayList<>();
+    this.sourceNodes = new ArrayList<>();
+
     LOG.info("Starting Container Balancer...");
+    LOG.info(toString());
+
+    balance();
+  }
 
-    // sorted list in order from most to least used
-    List<DatanodeUsageInfo> nodes = nodeManager.
-        getMostOrLeastUsedDatanodes(true);
-    double avgUtilisation = calculateAvgUtilisation(nodes);
+  /**
+   * Balances the cluster.
+   */
+  private void balance() {
+    overUtilizedNodes.clear();
+    underUtilizedNodes.clear();
+    aboveAverageUtilizedNodes.clear();
+    belowAverageUtilizedNodes.clear();
+    initializeIteration();
+  }
+
+  /**
+   * Initializes an iteration during balancing. Recognizes over, under,
+   * below-average,and under-average utilizes nodes. Decides whether
+   * balancing needs to continue or should be stopped.
+   *
+   * @return true if successfully initialized, otherwise false.
+   */
+  private boolean initializeIteration() {
+    List<DatanodeUsageInfo> nodes;
+    try {
+      // sorted list in order from most to least used
+      nodes = nodeManager.getMostOrLeastUsedDatanodes(true);
+    } catch (NullPointerException e) {
+      LOG.error("Container Balancer could not retrieve nodes from Node " +
+          "Manager.", e);
+      stop();
+      return false;
+    }
+
+    try {
+      clusterAvgUtilisation = calculateAvgUtilization(nodes);
+    } catch(ArithmeticException e) {
+      LOG.warn("Container Balancer failed to initialize an iteration", e);
+      return false;
+    }
+    LOG.info("Average utilization of the cluster is {}", clusterAvgUtilisation);
 
     // under utilized nodes have utilization(that is, used / capacity) less
     // than lower limit
-    double lowerLimit = avgUtilisation - threshold;
+    double lowerLimit = clusterAvgUtilisation - threshold;
 
     // over utilized nodes have utilization(that is, used / capacity) greater
     // than upper limit
-    double upperLimit = avgUtilisation + threshold;
+    double upperLimit = clusterAvgUtilisation + threshold;
+
     LOG.info("Lower limit for utilization is {}", lowerLimit);
     LOG.info("Upper limit for utilization is {}", upperLimit);
 
-    // find over utilised(source) and under utilised(target) nodes
-    sourceNodes = new ArrayList<>();
-    targetNodes = new ArrayList<>();
-//    for (DatanodeUsageInfo node : nodes) {
-//      SCMNodeStat stat = node.getScmNodeStat();
-//      double utilization = stat.getScmUsed().get().doubleValue() /
-//          stat.getCapacity().get().doubleValue();
-//      if (utilization > upperLimit) {
-//        sourceNodes.add(node);
-//      } else if (utilization < lowerLimit || utilization < avgUtilisation) {
-//        targetNodes.add(node);
-//      }
-//    }
-  }
-
-  // calculate the average datanode utilisation across the cluster
-  private double calculateAvgUtilisation(List<DatanodeUsageInfo> nodes) {
+    long numDatanodesToBalance = 0L;
+    double overLoadedBytes = 0D, underLoadedBytes = 0D;
+
+    // find over and under utilized nodes
+    for (DatanodeUsageInfo node : nodes) {
+      double utilization = calculateUtilization(node);
+      if (utilization > clusterAvgUtilisation) {
+        if (utilization > upperLimit) {
+          overUtilizedNodes.add(node);
+          numDatanodesToBalance += 1;
+
+          // amount of bytes greater than upper limit in this node
+          overLoadedBytes +=
+              ratioToBytes(node.getScmNodeStat().getCapacity().get(),
+                  utilization) -
+                  ratioToBytes(node.getScmNodeStat().getCapacity().get(),
+                      upperLimit);
+        } else {
+          aboveAverageUtilizedNodes.add(node);
+        }
+      } else if (utilization < clusterAvgUtilisation) {
+        if (utilization < lowerLimit) {
+          underUtilizedNodes.add(node);
+          numDatanodesToBalance += 1;
+
+          // amount of bytes lesser than lower limit in this node
+          underLoadedBytes +=
+              ratioToBytes(node.getScmNodeStat().getCapacity().get(),
+                  lowerLimit) -
+                  ratioToBytes(node.getScmNodeStat().getCapacity().get(),
+                      utilization);
+        } else {
+          belowAverageUtilizedNodes.add(node);
+        }
+      }
+    }
+
+    Collections.reverse(underUtilizedNodes);
+    Collections.reverse(belowAverageUtilizedNodes);
+
+    long numDatanodesBalanced = 0;
+    // count number of nodes that were balanced in previous iteration
+    for (DatanodeUsageInfo node : sourceNodes) {
+      if (!containsNode(overUtilizedNodes, node) &&
+          !containsNode(underUtilizedNodes, node)) {
+        numDatanodesBalanced += 1;
+      }
+    }
+
+    // calculate total number of nodes that have been balanced
+    numDatanodesBalanced =
+        numDatanodesBalanced + metrics.getNumDatanodesBalanced().get();
+    metrics.setNumDatanodesBalanced(new LongMetric(numDatanodesBalanced));
+    sourceNodes = new ArrayList<>(
+        overUtilizedNodes.size() + underUtilizedNodes.size());
+
+    if (numDatanodesBalanced + numDatanodesToBalance > maxDatanodesToBalance) {
+      LOG.info("Approaching Max Datanodes To Balance limit in Container " +
+          "Balancer. Stopping Balancer.");
+      stop();
+      return false;
+    } else {
+      sourceNodes.addAll(overUtilizedNodes);
+      sourceNodes.addAll(underUtilizedNodes);
+
+      if (sourceNodes.isEmpty()) {
+        LOG.info("Did not find any unbalanced Datanodes.");
+        stop();
+        return false;
+      } else {
+        LOG.info("Container Balancer has identified Datanodes that need to be" +
+            " balanced.");
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Performs binary search to determine if the specified listToSearch
+   * contains the specified node.
+   *
+   * @param listToSearch List of DatanodeUsageInfo to be searched.
+   * @param node DatanodeUsageInfo to be searched for.
+   * @return true if the specified node is present in listToSearch, otherwise
+   * false.
+   */
+  private boolean containsNode(

Review comment:
       Comparator can return 0 for two different datanodes with same utilisation. We will need to handle that case.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org