You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@ozone.apache.org by GitBox <gi...@apache.org> on 2021/05/10 12:13:15 UTC

[GitHub] [ozone] siddhantsangwan opened a new pull request #2230: HDDS-4927. Add support for initializing an iteration in ContainerBalancer. Add unit tests.

siddhantsangwan opened a new pull request #2230:
URL: https://github.com/apache/ozone/pull/2230


   ## What changes were proposed in this pull request?
   
   `ContainerBalancer` will identify over, under, above average, and below average utilized nodes at the start of each iteration. Based on this, it will determine whether balancing should continue. Unit tests to test this functionality.
   
   ## What is the link to the Apache JIRA
   
   https://issues.apache.org/jira/browse/HDDS-4927
   
   ## How was this patch tested?
   
   Added unit test `TestContainerBalancer`
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] siddhantsangwan commented on pull request #2230: HDDS-4927. Add support for initializing an iteration in ContainerBalancer. Add unit tests.

Posted by GitBox <gi...@apache.org>.
siddhantsangwan commented on pull request #2230:
URL: https://github.com/apache/ozone/pull/2230#issuecomment-845055091


   Thanks for the reviews. I have addressed the comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] siddhantsangwan commented on a change in pull request #2230: HDDS-4927. Add support for initializing an iteration in ContainerBalancer. Add unit tests.

Posted by GitBox <gi...@apache.org>.
siddhantsangwan commented on a change in pull request #2230:
URL: https://github.com/apache/ozone/pull/2230#discussion_r631636981



##########
File path: hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
##########
@@ -139,6 +137,37 @@ public MockNodeManager(boolean initializeFakeNodes, int nodeCount) {
         initializeFakeNodes, nodeCount);
   }
 
+  public MockNodeManager(List<DatanodeUsageInfo> nodes)

Review comment:
       Extracted initialization out to an initialization block.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] GlenGeng commented on pull request #2230: HDDS-4927. Add support for initializing an iteration in ContainerBalancer. Add unit tests.

Posted by GitBox <gi...@apache.org>.
GlenGeng commented on pull request #2230:
URL: https://github.com/apache/ozone/pull/2230#issuecomment-847825310


   LGTM. We can merge this PR for now, and start our future development based on it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] JacksonYao287 commented on a change in pull request #2230: HDDS-4927. Add support for initializing an iteration in ContainerBalancer. Add unit tests.

Posted by GitBox <gi...@apache.org>.
JacksonYao287 commented on a change in pull request #2230:
URL: https://github.com/apache/ozone/pull/2230#discussion_r639351757



##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
##########
@@ -40,96 +44,347 @@
   private ContainerManagerV2 containerManager;
   private ReplicationManager replicationManager;
   private OzoneConfiguration ozoneConfiguration;
+  private final SCMContext scmContext;
   private double threshold;
   private int maxDatanodesToBalance;
   private long maxSizeToMove;
-  private boolean balancerRunning;
-  private List<DatanodeUsageInfo> sourceNodes;
-  private List<DatanodeUsageInfo> targetNodes;
+  private List<DatanodeUsageInfo> unBalancedNodes;
+  private List<DatanodeUsageInfo> overUtilizedNodes;
+  private List<DatanodeUsageInfo> underUtilizedNodes;
+  private List<DatanodeUsageInfo> withinThresholdUtilizedNodes;
   private ContainerBalancerConfiguration config;
+  private ContainerBalancerMetrics metrics;
+  private long clusterCapacity;
+  private long clusterUsed;
+  private long clusterRemaining;
+  private double clusterAvgUtilisation;
+  private final AtomicBoolean balancerRunning = new AtomicBoolean(false);
 
+  /**
+   * Constructs ContainerBalancer with the specified arguments. Initializes
+   * new ContainerBalancerConfiguration and ContainerBalancerMetrics.
+   * Container Balancer does not start on construction.
+   *
+   * @param nodeManager NodeManager
+   * @param containerManager ContainerManager
+   * @param replicationManager ReplicationManager
+   * @param ozoneConfiguration OzoneConfiguration
+   */
   public ContainerBalancer(
       NodeManager nodeManager,
       ContainerManagerV2 containerManager,
       ReplicationManager replicationManager,
-      OzoneConfiguration ozoneConfiguration) {
+      OzoneConfiguration ozoneConfiguration,
+      final SCMContext scmContext) {
     this.nodeManager = nodeManager;
     this.containerManager = containerManager;
     this.replicationManager = replicationManager;
     this.ozoneConfiguration = ozoneConfiguration;
-    this.balancerRunning = false;
     this.config = new ContainerBalancerConfiguration();
+    this.metrics = new ContainerBalancerMetrics();
+    this.scmContext = scmContext;
+
+    this.clusterCapacity = 0L;
+    this.clusterUsed = 0L;
+    this.clusterRemaining = 0L;
+
+    this.overUtilizedNodes = new ArrayList<>();
+    this.underUtilizedNodes = new ArrayList<>();
+    this.unBalancedNodes = new ArrayList<>();
+    this.withinThresholdUtilizedNodes = new ArrayList<>();
   }
 
   /**
-   * Start ContainerBalancer. Current implementation is incomplete.
+   * Starts ContainerBalancer. Current implementation is incomplete.
    *
    * @param balancerConfiguration Configuration values.
    */
-  public void start(ContainerBalancerConfiguration balancerConfiguration) {
-    this.balancerRunning = true;
+  public boolean start(ContainerBalancerConfiguration balancerConfiguration) {

Review comment:
       maybe it is better to move these configuration initialization operations to constructor, start just do the start work without any parameter




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] lokeshj1703 commented on pull request #2230: HDDS-4927. Add support for initializing an iteration in ContainerBalancer. Add unit tests.

Posted by GitBox <gi...@apache.org>.
lokeshj1703 commented on pull request #2230:
URL: https://github.com/apache/ozone/pull/2230#issuecomment-846913805


   If hypothetically you have 5 nodes with same utilisation then containsNode might not return true with the current change. Because binary search can return index of any node with same utilisation.
   
   `return index >= 0 && listToSearch.get(index).equals(node);`
   
   This logic would return false since the returned datanode might have a different id.
   
   Comparator would need to handle the case of same utilisation.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] siddhantsangwan commented on a change in pull request #2230: HDDS-4927. Add support for initializing an iteration in ContainerBalancer. Add unit tests.

Posted by GitBox <gi...@apache.org>.
siddhantsangwan commented on a change in pull request #2230:
URL: https://github.com/apache/ozone/pull/2230#discussion_r629851887



##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
##########
@@ -59,74 +80,305 @@ public ContainerBalancer(
     this.ozoneConfiguration = ozoneConfiguration;
     this.balancerRunning = false;
     this.config = new ContainerBalancerConfiguration();
+    this.metrics = new ContainerBalancerMetrics();
   }
 
   /**
-   * Start ContainerBalancer. Current implementation is incomplete.
+   * Starts ContainerBalancer. Current implementation is incomplete.
    *
    * @param balancerConfiguration Configuration values.
    */
   public void start(ContainerBalancerConfiguration balancerConfiguration) {
+    if (balancerRunning) {
+      LOG.info("Container Balancer is already running.");
+      throw new RuntimeException();
+    }
     this.balancerRunning = true;
-
     ozoneConfiguration = new OzoneConfiguration();
 
-    // initialise configs
     this.config = balancerConfiguration;
     this.threshold = config.getThreshold();
-    this.maxDatanodesToBalance =
-        config.getMaxDatanodesToBalance();
+    this.maxDatanodesToBalance = config.getMaxDatanodesToBalance();
     this.maxSizeToMove = config.getMaxSizeToMove();
 
+    this.clusterCapacity = 0L;
+    this.clusterUsed = 0L;
+    this.clusterRemaining = 0L;
+
+    this.overUtilizedNodes = new ArrayList<>();
+    this.underUtilizedNodes = new ArrayList<>();
+    this.aboveAverageUtilizedNodes = new ArrayList<>();
+    this.belowAverageUtilizedNodes = new ArrayList<>();
+    this.sourceNodes = new ArrayList<>();
+
     LOG.info("Starting Container Balancer...");
+    LOG.info(toString());
+
+    balance();
+  }
 
-    // sorted list in order from most to least used
-    List<DatanodeUsageInfo> nodes = nodeManager.
-        getMostOrLeastUsedDatanodes(true);
-    double avgUtilisation = calculateAvgUtilisation(nodes);
+  /**
+   * Balances the cluster.
+   */
+  private void balance() {
+    boolean initialized = initializeIteration();

Review comment:
       Yes, thanks for pointing this out.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] siddhantsangwan commented on a change in pull request #2230: HDDS-4927. Add support for initializing an iteration in ContainerBalancer. Add unit tests.

Posted by GitBox <gi...@apache.org>.
siddhantsangwan commented on a change in pull request #2230:
URL: https://github.com/apache/ozone/pull/2230#discussion_r630722914



##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
##########
@@ -59,74 +80,321 @@ public ContainerBalancer(
     this.ozoneConfiguration = ozoneConfiguration;
     this.balancerRunning = false;
     this.config = new ContainerBalancerConfiguration();
+    this.metrics = new ContainerBalancerMetrics();
   }
 
   /**
-   * Start ContainerBalancer. Current implementation is incomplete.
+   * Starts ContainerBalancer. Current implementation is incomplete.
    *
    * @param balancerConfiguration Configuration values.
    */
   public void start(ContainerBalancerConfiguration balancerConfiguration) {
+    if (balancerRunning) {
+      LOG.info("Container Balancer is already running.");
+      throw new RuntimeException();
+    }
     this.balancerRunning = true;
-
     ozoneConfiguration = new OzoneConfiguration();
 
-    // initialise configs
     this.config = balancerConfiguration;
     this.threshold = config.getThreshold();
-    this.maxDatanodesToBalance =
-        config.getMaxDatanodesToBalance();
+    this.maxDatanodesToBalance = config.getMaxDatanodesToBalance();
     this.maxSizeToMove = config.getMaxSizeToMove();
 
+    this.clusterCapacity = 0L;
+    this.clusterUsed = 0L;
+    this.clusterRemaining = 0L;
+
+    this.overUtilizedNodes = new ArrayList<>();
+    this.underUtilizedNodes = new ArrayList<>();
+    this.aboveAverageUtilizedNodes = new ArrayList<>();
+    this.belowAverageUtilizedNodes = new ArrayList<>();
+    this.sourceNodes = new ArrayList<>();
+
     LOG.info("Starting Container Balancer...");
+    LOG.info(toString());
+
+    balance();
+  }
 
-    // sorted list in order from most to least used
-    List<DatanodeUsageInfo> nodes = nodeManager.
-        getMostOrLeastUsedDatanodes(true);
-    double avgUtilisation = calculateAvgUtilisation(nodes);
+  /**
+   * Balances the cluster.
+   */
+  private void balance() {
+    overUtilizedNodes.clear();
+    underUtilizedNodes.clear();
+    aboveAverageUtilizedNodes.clear();
+    belowAverageUtilizedNodes.clear();
+    initializeIteration();
+  }
+
+  /**
+   * Initializes an iteration during balancing. Recognizes over, under,
+   * below-average,and under-average utilizes nodes. Decides whether
+   * balancing needs to continue or should be stopped.
+   *
+   * @return true if successfully initialized, otherwise false.
+   */
+  private boolean initializeIteration() {
+    List<DatanodeUsageInfo> nodes;
+    try {
+      // sorted list in order from most to least used
+      nodes = nodeManager.getMostOrLeastUsedDatanodes(true);
+    } catch (NullPointerException e) {
+      LOG.error("Container Balancer could not retrieve nodes from Node " +
+          "Manager.", e);
+      stop();
+      return false;
+    }
+
+    try {
+      clusterAvgUtilisation = calculateAvgUtilization(nodes);
+    } catch(ArithmeticException e) {
+      LOG.warn("Container Balancer failed to initialize an iteration", e);
+      return false;
+    }
+    LOG.info("Average utilization of the cluster is {}", clusterAvgUtilisation);
 
     // under utilized nodes have utilization(that is, used / capacity) less
     // than lower limit
-    double lowerLimit = avgUtilisation - threshold;
+    double lowerLimit = clusterAvgUtilisation - threshold;
 
     // over utilized nodes have utilization(that is, used / capacity) greater
     // than upper limit
-    double upperLimit = avgUtilisation + threshold;
+    double upperLimit = clusterAvgUtilisation + threshold;
+

Review comment:
       In that case, the newly added DN will be under utilized. Balancer will recognize this and then try to move data into it from the other 10 DNs that are now above average utilized. However, the exact algorithm for choosing particular source DN and containers to move is under progress.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] lokeshj1703 commented on pull request #2230: HDDS-4927. Determine over and under utilized datanodes in Container Balancer.

Posted by GitBox <gi...@apache.org>.
lokeshj1703 commented on pull request #2230:
URL: https://github.com/apache/ozone/pull/2230#issuecomment-849364282


   @siddhantsangwan Thanks for the contribution! @linyiqun @GlenGeng @JacksonYao287 Thanks for the reviews! I have committed the PR to master branch.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] GlenGeng commented on a change in pull request #2230: HDDS-4927. Add support for initializing an iteration in ContainerBalancer. Add unit tests.

Posted by GitBox <gi...@apache.org>.
GlenGeng commented on a change in pull request #2230:
URL: https://github.com/apache/ozone/pull/2230#discussion_r629971934



##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
##########
@@ -59,74 +80,321 @@ public ContainerBalancer(
     this.ozoneConfiguration = ozoneConfiguration;
     this.balancerRunning = false;
     this.config = new ContainerBalancerConfiguration();
+    this.metrics = new ContainerBalancerMetrics();
   }
 
   /**
-   * Start ContainerBalancer. Current implementation is incomplete.
+   * Starts ContainerBalancer. Current implementation is incomplete.
    *
    * @param balancerConfiguration Configuration values.
    */
   public void start(ContainerBalancerConfiguration balancerConfiguration) {
+    if (balancerRunning) {
+      LOG.info("Container Balancer is already running.");
+      throw new RuntimeException();
+    }
     this.balancerRunning = true;
-
     ozoneConfiguration = new OzoneConfiguration();
 
-    // initialise configs
     this.config = balancerConfiguration;
     this.threshold = config.getThreshold();
-    this.maxDatanodesToBalance =
-        config.getMaxDatanodesToBalance();
+    this.maxDatanodesToBalance = config.getMaxDatanodesToBalance();
     this.maxSizeToMove = config.getMaxSizeToMove();
 
+    this.clusterCapacity = 0L;
+    this.clusterUsed = 0L;
+    this.clusterRemaining = 0L;
+
+    this.overUtilizedNodes = new ArrayList<>();
+    this.underUtilizedNodes = new ArrayList<>();
+    this.aboveAverageUtilizedNodes = new ArrayList<>();
+    this.belowAverageUtilizedNodes = new ArrayList<>();
+    this.sourceNodes = new ArrayList<>();
+
     LOG.info("Starting Container Balancer...");
+    LOG.info(toString());
+
+    balance();
+  }
 
-    // sorted list in order from most to least used
-    List<DatanodeUsageInfo> nodes = nodeManager.
-        getMostOrLeastUsedDatanodes(true);
-    double avgUtilisation = calculateAvgUtilisation(nodes);
+  /**
+   * Balances the cluster.
+   */
+  private void balance() {
+    overUtilizedNodes.clear();
+    underUtilizedNodes.clear();
+    aboveAverageUtilizedNodes.clear();
+    belowAverageUtilizedNodes.clear();
+    initializeIteration();
+  }
+
+  /**
+   * Initializes an iteration during balancing. Recognizes over, under,
+   * below-average,and under-average utilizes nodes. Decides whether
+   * balancing needs to continue or should be stopped.
+   *
+   * @return true if successfully initialized, otherwise false.
+   */
+  private boolean initializeIteration() {
+    List<DatanodeUsageInfo> nodes;
+    try {
+      // sorted list in order from most to least used
+      nodes = nodeManager.getMostOrLeastUsedDatanodes(true);
+    } catch (NullPointerException e) {

Review comment:
       Why catching NPE here ?

##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
##########
@@ -59,74 +80,321 @@ public ContainerBalancer(
     this.ozoneConfiguration = ozoneConfiguration;
     this.balancerRunning = false;
     this.config = new ContainerBalancerConfiguration();
+    this.metrics = new ContainerBalancerMetrics();
   }
 
   /**
-   * Start ContainerBalancer. Current implementation is incomplete.
+   * Starts ContainerBalancer. Current implementation is incomplete.
    *
    * @param balancerConfiguration Configuration values.
    */
   public void start(ContainerBalancerConfiguration balancerConfiguration) {
+    if (balancerRunning) {
+      LOG.info("Container Balancer is already running.");
+      throw new RuntimeException();
+    }
     this.balancerRunning = true;
-
     ozoneConfiguration = new OzoneConfiguration();
 
-    // initialise configs
     this.config = balancerConfiguration;
     this.threshold = config.getThreshold();
-    this.maxDatanodesToBalance =
-        config.getMaxDatanodesToBalance();
+    this.maxDatanodesToBalance = config.getMaxDatanodesToBalance();
     this.maxSizeToMove = config.getMaxSizeToMove();
 
+    this.clusterCapacity = 0L;
+    this.clusterUsed = 0L;
+    this.clusterRemaining = 0L;
+
+    this.overUtilizedNodes = new ArrayList<>();
+    this.underUtilizedNodes = new ArrayList<>();
+    this.aboveAverageUtilizedNodes = new ArrayList<>();
+    this.belowAverageUtilizedNodes = new ArrayList<>();
+    this.sourceNodes = new ArrayList<>();
+
     LOG.info("Starting Container Balancer...");
+    LOG.info(toString());
+
+    balance();
+  }
 
-    // sorted list in order from most to least used
-    List<DatanodeUsageInfo> nodes = nodeManager.
-        getMostOrLeastUsedDatanodes(true);
-    double avgUtilisation = calculateAvgUtilisation(nodes);
+  /**
+   * Balances the cluster.
+   */
+  private void balance() {
+    overUtilizedNodes.clear();
+    underUtilizedNodes.clear();
+    aboveAverageUtilizedNodes.clear();
+    belowAverageUtilizedNodes.clear();
+    initializeIteration();
+  }
+
+  /**
+   * Initializes an iteration during balancing. Recognizes over, under,
+   * below-average,and under-average utilizes nodes. Decides whether
+   * balancing needs to continue or should be stopped.
+   *
+   * @return true if successfully initialized, otherwise false.
+   */
+  private boolean initializeIteration() {
+    List<DatanodeUsageInfo> nodes;
+    try {
+      // sorted list in order from most to least used
+      nodes = nodeManager.getMostOrLeastUsedDatanodes(true);
+    } catch (NullPointerException e) {
+      LOG.error("Container Balancer could not retrieve nodes from Node " +
+          "Manager.", e);
+      stop();
+      return false;
+    }
+
+    try {
+      clusterAvgUtilisation = calculateAvgUtilization(nodes);
+    } catch(ArithmeticException e) {
+      LOG.warn("Container Balancer failed to initialize an iteration", e);
+      return false;
+    }
+    LOG.info("Average utilization of the cluster is {}", clusterAvgUtilisation);
 
     // under utilized nodes have utilization(that is, used / capacity) less
     // than lower limit
-    double lowerLimit = avgUtilisation - threshold;
+    double lowerLimit = clusterAvgUtilisation - threshold;
 
     // over utilized nodes have utilization(that is, used / capacity) greater
     // than upper limit
-    double upperLimit = avgUtilisation + threshold;
+    double upperLimit = clusterAvgUtilisation + threshold;
+

Review comment:
       NIT. merge 166 and 167 together. 
   what if `clusterAvgUtilisation` is less than `threshold`, e.g., for an empty cluster. Does a negative `lowerLimit` make sense here ?

##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
##########
@@ -59,74 +80,321 @@ public ContainerBalancer(
     this.ozoneConfiguration = ozoneConfiguration;
     this.balancerRunning = false;
     this.config = new ContainerBalancerConfiguration();
+    this.metrics = new ContainerBalancerMetrics();
   }
 
   /**
-   * Start ContainerBalancer. Current implementation is incomplete.
+   * Starts ContainerBalancer. Current implementation is incomplete.
    *
    * @param balancerConfiguration Configuration values.
    */
   public void start(ContainerBalancerConfiguration balancerConfiguration) {
+    if (balancerRunning) {
+      LOG.info("Container Balancer is already running.");
+      throw new RuntimeException();
+    }
     this.balancerRunning = true;
-
     ozoneConfiguration = new OzoneConfiguration();
 
-    // initialise configs
     this.config = balancerConfiguration;
     this.threshold = config.getThreshold();
-    this.maxDatanodesToBalance =
-        config.getMaxDatanodesToBalance();
+    this.maxDatanodesToBalance = config.getMaxDatanodesToBalance();
     this.maxSizeToMove = config.getMaxSizeToMove();
 
+    this.clusterCapacity = 0L;
+    this.clusterUsed = 0L;
+    this.clusterRemaining = 0L;
+
+    this.overUtilizedNodes = new ArrayList<>();
+    this.underUtilizedNodes = new ArrayList<>();
+    this.aboveAverageUtilizedNodes = new ArrayList<>();
+    this.belowAverageUtilizedNodes = new ArrayList<>();
+    this.sourceNodes = new ArrayList<>();
+
     LOG.info("Starting Container Balancer...");
+    LOG.info(toString());
+
+    balance();
+  }
 
-    // sorted list in order from most to least used
-    List<DatanodeUsageInfo> nodes = nodeManager.
-        getMostOrLeastUsedDatanodes(true);
-    double avgUtilisation = calculateAvgUtilisation(nodes);
+  /**
+   * Balances the cluster.
+   */
+  private void balance() {
+    overUtilizedNodes.clear();
+    underUtilizedNodes.clear();
+    aboveAverageUtilizedNodes.clear();
+    belowAverageUtilizedNodes.clear();
+    initializeIteration();
+  }
+
+  /**
+   * Initializes an iteration during balancing. Recognizes over, under,
+   * below-average,and under-average utilizes nodes. Decides whether
+   * balancing needs to continue or should be stopped.
+   *
+   * @return true if successfully initialized, otherwise false.
+   */
+  private boolean initializeIteration() {
+    List<DatanodeUsageInfo> nodes;
+    try {
+      // sorted list in order from most to least used
+      nodes = nodeManager.getMostOrLeastUsedDatanodes(true);
+    } catch (NullPointerException e) {
+      LOG.error("Container Balancer could not retrieve nodes from Node " +
+          "Manager.", e);
+      stop();
+      return false;
+    }
+
+    try {
+      clusterAvgUtilisation = calculateAvgUtilization(nodes);
+    } catch(ArithmeticException e) {
+      LOG.warn("Container Balancer failed to initialize an iteration", e);
+      return false;
+    }
+    LOG.info("Average utilization of the cluster is {}", clusterAvgUtilisation);
 
     // under utilized nodes have utilization(that is, used / capacity) less
     // than lower limit
-    double lowerLimit = avgUtilisation - threshold;
+    double lowerLimit = clusterAvgUtilisation - threshold;
 
     // over utilized nodes have utilization(that is, used / capacity) greater
     // than upper limit
-    double upperLimit = avgUtilisation + threshold;
+    double upperLimit = clusterAvgUtilisation + threshold;
+
     LOG.info("Lower limit for utilization is {}", lowerLimit);
     LOG.info("Upper limit for utilization is {}", upperLimit);
 
-    // find over utilised(source) and under utilised(target) nodes
-    sourceNodes = new ArrayList<>();
-    targetNodes = new ArrayList<>();
-//    for (DatanodeUsageInfo node : nodes) {
-//      SCMNodeStat stat = node.getScmNodeStat();
-//      double utilization = stat.getScmUsed().get().doubleValue() /
-//          stat.getCapacity().get().doubleValue();
-//      if (utilization > upperLimit) {
-//        sourceNodes.add(node);
-//      } else if (utilization < lowerLimit || utilization < avgUtilisation) {
-//        targetNodes.add(node);
-//      }
-//    }
-  }
-
-  // calculate the average datanode utilisation across the cluster
-  private double calculateAvgUtilisation(List<DatanodeUsageInfo> nodes) {
+    long numDatanodesToBalance = 0L;
+    double overLoadedBytes = 0D, underLoadedBytes = 0D;
+
+    // find over and under utilized nodes
+    for (DatanodeUsageInfo node : nodes) {
+      double utilization = calculateUtilization(node);
+      if (utilization > clusterAvgUtilisation) {
+        if (utilization > upperLimit) {
+          overUtilizedNodes.add(node);
+          numDatanodesToBalance += 1;
+
+          // amount of bytes greater than upper limit in this node
+          overLoadedBytes +=
+              ratioToBytes(node.getScmNodeStat().getCapacity().get(),
+                  utilization) -
+                  ratioToBytes(node.getScmNodeStat().getCapacity().get(),
+                      upperLimit);
+        } else {
+          aboveAverageUtilizedNodes.add(node);
+        }
+      } else if (utilization < clusterAvgUtilisation) {
+        if (utilization < lowerLimit) {
+          underUtilizedNodes.add(node);
+          numDatanodesToBalance += 1;
+
+          // amount of bytes lesser than lower limit in this node
+          underLoadedBytes +=
+              ratioToBytes(node.getScmNodeStat().getCapacity().get(),
+                  lowerLimit) -
+                  ratioToBytes(node.getScmNodeStat().getCapacity().get(),
+                      utilization);
+        } else {
+          belowAverageUtilizedNodes.add(node);
+        }
+      }
+    }
+
+    Collections.reverse(underUtilizedNodes);
+    Collections.reverse(belowAverageUtilizedNodes);
+
+    long numDatanodesBalanced = 0;
+    // count number of nodes that were balanced in previous iteration
+    for (DatanodeUsageInfo node : sourceNodes) {
+      if (!containsNode(overUtilizedNodes, node) &&

Review comment:
       You can use `CollectionUtils.intersection()`, `CollectionUtils.intersection()`, `CollectionUtils.union()` to simplify the code.

##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
##########
@@ -59,74 +80,321 @@ public ContainerBalancer(
     this.ozoneConfiguration = ozoneConfiguration;
     this.balancerRunning = false;
     this.config = new ContainerBalancerConfiguration();
+    this.metrics = new ContainerBalancerMetrics();
   }
 
   /**
-   * Start ContainerBalancer. Current implementation is incomplete.
+   * Starts ContainerBalancer. Current implementation is incomplete.
    *
    * @param balancerConfiguration Configuration values.
    */
   public void start(ContainerBalancerConfiguration balancerConfiguration) {
+    if (balancerRunning) {
+      LOG.info("Container Balancer is already running.");
+      throw new RuntimeException();
+    }
     this.balancerRunning = true;
-
     ozoneConfiguration = new OzoneConfiguration();
 
-    // initialise configs
     this.config = balancerConfiguration;
     this.threshold = config.getThreshold();
-    this.maxDatanodesToBalance =
-        config.getMaxDatanodesToBalance();
+    this.maxDatanodesToBalance = config.getMaxDatanodesToBalance();
     this.maxSizeToMove = config.getMaxSizeToMove();
 
+    this.clusterCapacity = 0L;
+    this.clusterUsed = 0L;
+    this.clusterRemaining = 0L;
+
+    this.overUtilizedNodes = new ArrayList<>();
+    this.underUtilizedNodes = new ArrayList<>();
+    this.aboveAverageUtilizedNodes = new ArrayList<>();
+    this.belowAverageUtilizedNodes = new ArrayList<>();
+    this.sourceNodes = new ArrayList<>();
+
     LOG.info("Starting Container Balancer...");
+    LOG.info(toString());
+
+    balance();
+  }
 
-    // sorted list in order from most to least used
-    List<DatanodeUsageInfo> nodes = nodeManager.
-        getMostOrLeastUsedDatanodes(true);
-    double avgUtilisation = calculateAvgUtilisation(nodes);
+  /**
+   * Balances the cluster.
+   */
+  private void balance() {
+    overUtilizedNodes.clear();
+    underUtilizedNodes.clear();
+    aboveAverageUtilizedNodes.clear();
+    belowAverageUtilizedNodes.clear();
+    initializeIteration();
+  }
+
+  /**
+   * Initializes an iteration during balancing. Recognizes over, under,
+   * below-average,and under-average utilizes nodes. Decides whether
+   * balancing needs to continue or should be stopped.
+   *
+   * @return true if successfully initialized, otherwise false.
+   */
+  private boolean initializeIteration() {
+    List<DatanodeUsageInfo> nodes;
+    try {
+      // sorted list in order from most to least used
+      nodes = nodeManager.getMostOrLeastUsedDatanodes(true);
+    } catch (NullPointerException e) {
+      LOG.error("Container Balancer could not retrieve nodes from Node " +
+          "Manager.", e);
+      stop();
+      return false;
+    }
+
+    try {
+      clusterAvgUtilisation = calculateAvgUtilization(nodes);
+    } catch(ArithmeticException e) {
+      LOG.warn("Container Balancer failed to initialize an iteration", e);
+      return false;
+    }
+    LOG.info("Average utilization of the cluster is {}", clusterAvgUtilisation);
 
     // under utilized nodes have utilization(that is, used / capacity) less
     // than lower limit
-    double lowerLimit = avgUtilisation - threshold;
+    double lowerLimit = clusterAvgUtilisation - threshold;
 
     // over utilized nodes have utilization(that is, used / capacity) greater
     // than upper limit
-    double upperLimit = avgUtilisation + threshold;
+    double upperLimit = clusterAvgUtilisation + threshold;
+
     LOG.info("Lower limit for utilization is {}", lowerLimit);
     LOG.info("Upper limit for utilization is {}", upperLimit);
 
-    // find over utilised(source) and under utilised(target) nodes
-    sourceNodes = new ArrayList<>();
-    targetNodes = new ArrayList<>();
-//    for (DatanodeUsageInfo node : nodes) {
-//      SCMNodeStat stat = node.getScmNodeStat();
-//      double utilization = stat.getScmUsed().get().doubleValue() /
-//          stat.getCapacity().get().doubleValue();
-//      if (utilization > upperLimit) {
-//        sourceNodes.add(node);
-//      } else if (utilization < lowerLimit || utilization < avgUtilisation) {
-//        targetNodes.add(node);
-//      }
-//    }
-  }
-
-  // calculate the average datanode utilisation across the cluster
-  private double calculateAvgUtilisation(List<DatanodeUsageInfo> nodes) {
+    long numDatanodesToBalance = 0L;
+    double overLoadedBytes = 0D, underLoadedBytes = 0D;
+
+    // find over and under utilized nodes
+    for (DatanodeUsageInfo node : nodes) {
+      double utilization = calculateUtilization(node);
+      if (utilization > clusterAvgUtilisation) {
+        if (utilization > upperLimit) {
+          overUtilizedNodes.add(node);
+          numDatanodesToBalance += 1;
+
+          // amount of bytes greater than upper limit in this node
+          overLoadedBytes +=
+              ratioToBytes(node.getScmNodeStat().getCapacity().get(),
+                  utilization) -
+                  ratioToBytes(node.getScmNodeStat().getCapacity().get(),
+                      upperLimit);
+        } else {
+          aboveAverageUtilizedNodes.add(node);
+        }
+      } else if (utilization < clusterAvgUtilisation) {
+        if (utilization < lowerLimit) {
+          underUtilizedNodes.add(node);
+          numDatanodesToBalance += 1;
+
+          // amount of bytes lesser than lower limit in this node
+          underLoadedBytes +=
+              ratioToBytes(node.getScmNodeStat().getCapacity().get(),
+                  lowerLimit) -
+                  ratioToBytes(node.getScmNodeStat().getCapacity().get(),
+                      utilization);
+        } else {
+          belowAverageUtilizedNodes.add(node);
+        }
+      }
+    }
+
+    Collections.reverse(underUtilizedNodes);
+    Collections.reverse(belowAverageUtilizedNodes);
+
+    long numDatanodesBalanced = 0;
+    // count number of nodes that were balanced in previous iteration
+    for (DatanodeUsageInfo node : sourceNodes) {
+      if (!containsNode(overUtilizedNodes, node) &&
+          !containsNode(underUtilizedNodes, node)) {
+        numDatanodesBalanced += 1;
+      }
+    }
+
+    // calculate total number of nodes that have been balanced
+    numDatanodesBalanced =
+        numDatanodesBalanced + metrics.getNumDatanodesBalanced().get();
+    metrics.setNumDatanodesBalanced(new LongMetric(numDatanodesBalanced));
+    sourceNodes = new ArrayList<>(
+        overUtilizedNodes.size() + underUtilizedNodes.size());
+
+    if (numDatanodesBalanced + numDatanodesToBalance > maxDatanodesToBalance) {
+      LOG.info("Approaching Max Datanodes To Balance limit in Container " +
+          "Balancer. Stopping Balancer.");
+      stop();
+      return false;
+    } else {
+      sourceNodes.addAll(overUtilizedNodes);
+      sourceNodes.addAll(underUtilizedNodes);
+
+      if (sourceNodes.isEmpty()) {
+        LOG.info("Did not find any unbalanced Datanodes.");
+        stop();
+        return false;
+      } else {
+        LOG.info("Container Balancer has identified Datanodes that need to be" +
+            " balanced.");
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Performs binary search to determine if the specified listToSearch
+   * contains the specified node.
+   *
+   * @param listToSearch List of DatanodeUsageInfo to be searched.
+   * @param node DatanodeUsageInfo to be searched for.
+   * @return true if the specified node is present in listToSearch, otherwise
+   * false.
+   */
+  private boolean containsNode(

Review comment:
       NIT. `containsNode` is called multi times, better change `listToSearch` to hash set, and do the existence check, which may be simpler and quicker.
   
   For example, declare `overUtilizedNodes` and `underUtilizedNodes` to be hash set.

##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
##########
@@ -59,74 +80,321 @@ public ContainerBalancer(
     this.ozoneConfiguration = ozoneConfiguration;
     this.balancerRunning = false;
     this.config = new ContainerBalancerConfiguration();
+    this.metrics = new ContainerBalancerMetrics();
   }
 
   /**
-   * Start ContainerBalancer. Current implementation is incomplete.
+   * Starts ContainerBalancer. Current implementation is incomplete.
    *
    * @param balancerConfiguration Configuration values.
    */
   public void start(ContainerBalancerConfiguration balancerConfiguration) {
+    if (balancerRunning) {
+      LOG.info("Container Balancer is already running.");
+      throw new RuntimeException();
+    }
     this.balancerRunning = true;
-
     ozoneConfiguration = new OzoneConfiguration();
 
-    // initialise configs
     this.config = balancerConfiguration;
     this.threshold = config.getThreshold();
-    this.maxDatanodesToBalance =
-        config.getMaxDatanodesToBalance();
+    this.maxDatanodesToBalance = config.getMaxDatanodesToBalance();
     this.maxSizeToMove = config.getMaxSizeToMove();
 
+    this.clusterCapacity = 0L;
+    this.clusterUsed = 0L;
+    this.clusterRemaining = 0L;
+
+    this.overUtilizedNodes = new ArrayList<>();
+    this.underUtilizedNodes = new ArrayList<>();
+    this.aboveAverageUtilizedNodes = new ArrayList<>();
+    this.belowAverageUtilizedNodes = new ArrayList<>();
+    this.sourceNodes = new ArrayList<>();
+
     LOG.info("Starting Container Balancer...");
+    LOG.info(toString());
+
+    balance();
+  }
 
-    // sorted list in order from most to least used
-    List<DatanodeUsageInfo> nodes = nodeManager.
-        getMostOrLeastUsedDatanodes(true);
-    double avgUtilisation = calculateAvgUtilisation(nodes);
+  /**
+   * Balances the cluster.
+   */
+  private void balance() {
+    overUtilizedNodes.clear();
+    underUtilizedNodes.clear();
+    aboveAverageUtilizedNodes.clear();
+    belowAverageUtilizedNodes.clear();
+    initializeIteration();
+  }
+
+  /**
+   * Initializes an iteration during balancing. Recognizes over, under,
+   * below-average,and under-average utilizes nodes. Decides whether
+   * balancing needs to continue or should be stopped.
+   *
+   * @return true if successfully initialized, otherwise false.
+   */
+  private boolean initializeIteration() {
+    List<DatanodeUsageInfo> nodes;
+    try {
+      // sorted list in order from most to least used
+      nodes = nodeManager.getMostOrLeastUsedDatanodes(true);
+    } catch (NullPointerException e) {
+      LOG.error("Container Balancer could not retrieve nodes from Node " +
+          "Manager.", e);
+      stop();
+      return false;
+    }
+
+    try {
+      clusterAvgUtilisation = calculateAvgUtilization(nodes);

Review comment:
       `ArithmeticException` means `nodes` is empty, which leading to divide 0. How about skip this iteration if `nodes` is empty ? say
   
   ```
   nodes = nodeManager.getMostOrLeastUsedDatanodes(true);
   if (nodes.empty()) {
     return true.
   }
   ```
   
   The balancer should not work if SCM haven't heard any datanodes.

##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
##########
@@ -59,74 +80,321 @@ public ContainerBalancer(
     this.ozoneConfiguration = ozoneConfiguration;
     this.balancerRunning = false;
     this.config = new ContainerBalancerConfiguration();
+    this.metrics = new ContainerBalancerMetrics();
   }
 
   /**
-   * Start ContainerBalancer. Current implementation is incomplete.
+   * Starts ContainerBalancer. Current implementation is incomplete.
    *
    * @param balancerConfiguration Configuration values.
    */
   public void start(ContainerBalancerConfiguration balancerConfiguration) {
+    if (balancerRunning) {
+      LOG.info("Container Balancer is already running.");
+      throw new RuntimeException();
+    }
     this.balancerRunning = true;
-
     ozoneConfiguration = new OzoneConfiguration();
 
-    // initialise configs
     this.config = balancerConfiguration;
     this.threshold = config.getThreshold();
-    this.maxDatanodesToBalance =
-        config.getMaxDatanodesToBalance();
+    this.maxDatanodesToBalance = config.getMaxDatanodesToBalance();
     this.maxSizeToMove = config.getMaxSizeToMove();
 
+    this.clusterCapacity = 0L;
+    this.clusterUsed = 0L;
+    this.clusterRemaining = 0L;
+
+    this.overUtilizedNodes = new ArrayList<>();
+    this.underUtilizedNodes = new ArrayList<>();
+    this.aboveAverageUtilizedNodes = new ArrayList<>();
+    this.belowAverageUtilizedNodes = new ArrayList<>();
+    this.sourceNodes = new ArrayList<>();
+
     LOG.info("Starting Container Balancer...");
+    LOG.info(toString());

Review comment:
       NIT. merge the two info. In multi-thread context, there might be intervening logs between 114 and 115.

##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
##########
@@ -59,74 +80,321 @@ public ContainerBalancer(
     this.ozoneConfiguration = ozoneConfiguration;
     this.balancerRunning = false;
     this.config = new ContainerBalancerConfiguration();
+    this.metrics = new ContainerBalancerMetrics();
   }
 
   /**
-   * Start ContainerBalancer. Current implementation is incomplete.
+   * Starts ContainerBalancer. Current implementation is incomplete.
    *
    * @param balancerConfiguration Configuration values.
    */
   public void start(ContainerBalancerConfiguration balancerConfiguration) {
+    if (balancerRunning) {
+      LOG.info("Container Balancer is already running.");
+      throw new RuntimeException();
+    }
     this.balancerRunning = true;
-
     ozoneConfiguration = new OzoneConfiguration();
 
-    // initialise configs
     this.config = balancerConfiguration;
     this.threshold = config.getThreshold();
-    this.maxDatanodesToBalance =
-        config.getMaxDatanodesToBalance();
+    this.maxDatanodesToBalance = config.getMaxDatanodesToBalance();
     this.maxSizeToMove = config.getMaxSizeToMove();
 
+    this.clusterCapacity = 0L;
+    this.clusterUsed = 0L;
+    this.clusterRemaining = 0L;
+
+    this.overUtilizedNodes = new ArrayList<>();
+    this.underUtilizedNodes = new ArrayList<>();
+    this.aboveAverageUtilizedNodes = new ArrayList<>();
+    this.belowAverageUtilizedNodes = new ArrayList<>();
+    this.sourceNodes = new ArrayList<>();
+
     LOG.info("Starting Container Balancer...");
+    LOG.info(toString());
+
+    balance();
+  }
 
-    // sorted list in order from most to least used
-    List<DatanodeUsageInfo> nodes = nodeManager.
-        getMostOrLeastUsedDatanodes(true);
-    double avgUtilisation = calculateAvgUtilisation(nodes);
+  /**
+   * Balances the cluster.
+   */
+  private void balance() {
+    overUtilizedNodes.clear();
+    underUtilizedNodes.clear();
+    aboveAverageUtilizedNodes.clear();
+    belowAverageUtilizedNodes.clear();
+    initializeIteration();
+  }
+
+  /**
+   * Initializes an iteration during balancing. Recognizes over, under,
+   * below-average,and under-average utilizes nodes. Decides whether
+   * balancing needs to continue or should be stopped.
+   *
+   * @return true if successfully initialized, otherwise false.
+   */
+  private boolean initializeIteration() {
+    List<DatanodeUsageInfo> nodes;

Review comment:
       NIT. `nodeUsageInfos`. nodes is misleading here.

##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
##########
@@ -59,74 +80,321 @@ public ContainerBalancer(
     this.ozoneConfiguration = ozoneConfiguration;
     this.balancerRunning = false;
     this.config = new ContainerBalancerConfiguration();
+    this.metrics = new ContainerBalancerMetrics();
   }
 
   /**
-   * Start ContainerBalancer. Current implementation is incomplete.
+   * Starts ContainerBalancer. Current implementation is incomplete.
    *
    * @param balancerConfiguration Configuration values.
    */
   public void start(ContainerBalancerConfiguration balancerConfiguration) {
+    if (balancerRunning) {

Review comment:
       NIT. Better use a lock free variable to avoid contention, and print a error instead of throwing RuntimeException.
   
   ```
   private final AtomicBoolean balancerRunning = new AtomicBoolean(false);
   
   public void start(ContainerBalancerConfiguration balancerConfiguration) {
     if (!balancerRunning.compareAndSet(false, true)) {
       LOG.error("Container Balancer is already running.");
       return;
     }
   
     /// 
   }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] JacksonYao287 commented on a change in pull request #2230: HDDS-4927. Add support for initializing an iteration in ContainerBalancer. Add unit tests.

Posted by GitBox <gi...@apache.org>.
JacksonYao287 commented on a change in pull request #2230:
URL: https://github.com/apache/ozone/pull/2230#discussion_r639537558



##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
##########
@@ -40,96 +44,347 @@
   private ContainerManagerV2 containerManager;
   private ReplicationManager replicationManager;
   private OzoneConfiguration ozoneConfiguration;
+  private final SCMContext scmContext;
   private double threshold;
   private int maxDatanodesToBalance;
   private long maxSizeToMove;
-  private boolean balancerRunning;
-  private List<DatanodeUsageInfo> sourceNodes;
-  private List<DatanodeUsageInfo> targetNodes;
+  private List<DatanodeUsageInfo> unBalancedNodes;
+  private List<DatanodeUsageInfo> overUtilizedNodes;
+  private List<DatanodeUsageInfo> underUtilizedNodes;
+  private List<DatanodeUsageInfo> withinThresholdUtilizedNodes;
   private ContainerBalancerConfiguration config;
+  private ContainerBalancerMetrics metrics;
+  private long clusterCapacity;
+  private long clusterUsed;
+  private long clusterRemaining;
+  private double clusterAvgUtilisation;
+  private final AtomicBoolean balancerRunning = new AtomicBoolean(false);
 
+  /**
+   * Constructs ContainerBalancer with the specified arguments. Initializes
+   * new ContainerBalancerConfiguration and ContainerBalancerMetrics.
+   * Container Balancer does not start on construction.
+   *
+   * @param nodeManager NodeManager
+   * @param containerManager ContainerManager
+   * @param replicationManager ReplicationManager
+   * @param ozoneConfiguration OzoneConfiguration
+   */
   public ContainerBalancer(
       NodeManager nodeManager,
       ContainerManagerV2 containerManager,
       ReplicationManager replicationManager,
-      OzoneConfiguration ozoneConfiguration) {
+      OzoneConfiguration ozoneConfiguration,
+      final SCMContext scmContext) {
     this.nodeManager = nodeManager;
     this.containerManager = containerManager;
     this.replicationManager = replicationManager;
     this.ozoneConfiguration = ozoneConfiguration;
-    this.balancerRunning = false;
     this.config = new ContainerBalancerConfiguration();
+    this.metrics = new ContainerBalancerMetrics();
+    this.scmContext = scmContext;
+
+    this.clusterCapacity = 0L;
+    this.clusterUsed = 0L;
+    this.clusterRemaining = 0L;
+
+    this.overUtilizedNodes = new ArrayList<>();
+    this.underUtilizedNodes = new ArrayList<>();
+    this.unBalancedNodes = new ArrayList<>();
+    this.withinThresholdUtilizedNodes = new ArrayList<>();
   }
 
   /**
-   * Start ContainerBalancer. Current implementation is incomplete.
+   * Starts ContainerBalancer. Current implementation is incomplete.
    *
    * @param balancerConfiguration Configuration values.
    */
-  public void start(ContainerBalancerConfiguration balancerConfiguration) {
-    this.balancerRunning = true;
+  public boolean start(ContainerBalancerConfiguration balancerConfiguration) {

Review comment:
       if we put container balancer inside SCM ,  the configuration is just the one loaded by scm.  so ,the configuration can not be reloaded unless restarting scm




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] siddhantsangwan commented on a change in pull request #2230: HDDS-4927. Add support for initializing an iteration in ContainerBalancer. Add unit tests.

Posted by GitBox <gi...@apache.org>.
siddhantsangwan commented on a change in pull request #2230:
URL: https://github.com/apache/ozone/pull/2230#discussion_r630113858



##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
##########
@@ -59,74 +80,321 @@ public ContainerBalancer(
     this.ozoneConfiguration = ozoneConfiguration;
     this.balancerRunning = false;
     this.config = new ContainerBalancerConfiguration();
+    this.metrics = new ContainerBalancerMetrics();
   }
 
   /**
-   * Start ContainerBalancer. Current implementation is incomplete.
+   * Starts ContainerBalancer. Current implementation is incomplete.
    *
    * @param balancerConfiguration Configuration values.
    */
   public void start(ContainerBalancerConfiguration balancerConfiguration) {
+    if (balancerRunning) {
+      LOG.info("Container Balancer is already running.");
+      throw new RuntimeException();
+    }
     this.balancerRunning = true;
-
     ozoneConfiguration = new OzoneConfiguration();
 
-    // initialise configs
     this.config = balancerConfiguration;
     this.threshold = config.getThreshold();
-    this.maxDatanodesToBalance =
-        config.getMaxDatanodesToBalance();
+    this.maxDatanodesToBalance = config.getMaxDatanodesToBalance();
     this.maxSizeToMove = config.getMaxSizeToMove();
 
+    this.clusterCapacity = 0L;
+    this.clusterUsed = 0L;
+    this.clusterRemaining = 0L;
+
+    this.overUtilizedNodes = new ArrayList<>();
+    this.underUtilizedNodes = new ArrayList<>();
+    this.aboveAverageUtilizedNodes = new ArrayList<>();
+    this.belowAverageUtilizedNodes = new ArrayList<>();
+    this.sourceNodes = new ArrayList<>();
+
     LOG.info("Starting Container Balancer...");
+    LOG.info(toString());
+
+    balance();
+  }
 
-    // sorted list in order from most to least used
-    List<DatanodeUsageInfo> nodes = nodeManager.
-        getMostOrLeastUsedDatanodes(true);
-    double avgUtilisation = calculateAvgUtilisation(nodes);
+  /**
+   * Balances the cluster.
+   */
+  private void balance() {
+    overUtilizedNodes.clear();
+    underUtilizedNodes.clear();
+    aboveAverageUtilizedNodes.clear();
+    belowAverageUtilizedNodes.clear();
+    initializeIteration();
+  }
+
+  /**
+   * Initializes an iteration during balancing. Recognizes over, under,
+   * below-average,and under-average utilizes nodes. Decides whether
+   * balancing needs to continue or should be stopped.
+   *
+   * @return true if successfully initialized, otherwise false.
+   */
+  private boolean initializeIteration() {
+    List<DatanodeUsageInfo> nodes;
+    try {
+      // sorted list in order from most to least used
+      nodes = nodeManager.getMostOrLeastUsedDatanodes(true);
+    } catch (NullPointerException e) {
+      LOG.error("Container Balancer could not retrieve nodes from Node " +
+          "Manager.", e);
+      stop();
+      return false;
+    }
+
+    try {
+      clusterAvgUtilisation = calculateAvgUtilization(nodes);

Review comment:
       Makes sense. But shouldn't we return false then?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] siddhantsangwan commented on pull request #2230: HDDS-4927. Add support for initializing an iteration in ContainerBalancer. Add unit tests.

Posted by GitBox <gi...@apache.org>.
siddhantsangwan commented on pull request #2230:
URL: https://github.com/apache/ozone/pull/2230#issuecomment-846954851


   > Because binary search can return index of any node with same utilisation.
   
   Yes, this case can be handled in another Jira. Thanks for pointing this out.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] siddhantsangwan commented on a change in pull request #2230: HDDS-4927. Add support for initializing an iteration in ContainerBalancer. Add unit tests.

Posted by GitBox <gi...@apache.org>.
siddhantsangwan commented on a change in pull request #2230:
URL: https://github.com/apache/ozone/pull/2230#discussion_r630726276



##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
##########
@@ -59,74 +80,321 @@ public ContainerBalancer(
     this.ozoneConfiguration = ozoneConfiguration;
     this.balancerRunning = false;
     this.config = new ContainerBalancerConfiguration();
+    this.metrics = new ContainerBalancerMetrics();
   }
 
   /**
-   * Start ContainerBalancer. Current implementation is incomplete.
+   * Starts ContainerBalancer. Current implementation is incomplete.
    *
    * @param balancerConfiguration Configuration values.
    */
   public void start(ContainerBalancerConfiguration balancerConfiguration) {
+    if (balancerRunning) {
+      LOG.info("Container Balancer is already running.");
+      throw new RuntimeException();
+    }
     this.balancerRunning = true;
-
     ozoneConfiguration = new OzoneConfiguration();
 
-    // initialise configs
     this.config = balancerConfiguration;
     this.threshold = config.getThreshold();
-    this.maxDatanodesToBalance =
-        config.getMaxDatanodesToBalance();
+    this.maxDatanodesToBalance = config.getMaxDatanodesToBalance();
     this.maxSizeToMove = config.getMaxSizeToMove();
 
+    this.clusterCapacity = 0L;
+    this.clusterUsed = 0L;
+    this.clusterRemaining = 0L;
+
+    this.overUtilizedNodes = new ArrayList<>();
+    this.underUtilizedNodes = new ArrayList<>();
+    this.aboveAverageUtilizedNodes = new ArrayList<>();
+    this.belowAverageUtilizedNodes = new ArrayList<>();
+    this.sourceNodes = new ArrayList<>();
+
     LOG.info("Starting Container Balancer...");
+    LOG.info(toString());
+
+    balance();
+  }
 
-    // sorted list in order from most to least used
-    List<DatanodeUsageInfo> nodes = nodeManager.
-        getMostOrLeastUsedDatanodes(true);
-    double avgUtilisation = calculateAvgUtilisation(nodes);
+  /**
+   * Balances the cluster.
+   */
+  private void balance() {
+    overUtilizedNodes.clear();
+    underUtilizedNodes.clear();
+    aboveAverageUtilizedNodes.clear();
+    belowAverageUtilizedNodes.clear();
+    initializeIteration();
+  }
+
+  /**
+   * Initializes an iteration during balancing. Recognizes over, under,
+   * below-average,and under-average utilizes nodes. Decides whether
+   * balancing needs to continue or should be stopped.
+   *
+   * @return true if successfully initialized, otherwise false.
+   */
+  private boolean initializeIteration() {
+    List<DatanodeUsageInfo> nodes;
+    try {
+      // sorted list in order from most to least used
+      nodes = nodeManager.getMostOrLeastUsedDatanodes(true);
+    } catch (NullPointerException e) {
+      LOG.error("Container Balancer could not retrieve nodes from Node " +
+          "Manager.", e);
+      stop();
+      return false;
+    }
+
+    try {
+      clusterAvgUtilisation = calculateAvgUtilization(nodes);
+    } catch(ArithmeticException e) {
+      LOG.warn("Container Balancer failed to initialize an iteration", e);
+      return false;
+    }
+    LOG.info("Average utilization of the cluster is {}", clusterAvgUtilisation);
 
     // under utilized nodes have utilization(that is, used / capacity) less
     // than lower limit
-    double lowerLimit = avgUtilisation - threshold;
+    double lowerLimit = clusterAvgUtilisation - threshold;
 
     // over utilized nodes have utilization(that is, used / capacity) greater
     // than upper limit
-    double upperLimit = avgUtilisation + threshold;
+    double upperLimit = clusterAvgUtilisation + threshold;
+
     LOG.info("Lower limit for utilization is {}", lowerLimit);
     LOG.info("Upper limit for utilization is {}", upperLimit);
 
-    // find over utilised(source) and under utilised(target) nodes
-    sourceNodes = new ArrayList<>();
-    targetNodes = new ArrayList<>();
-//    for (DatanodeUsageInfo node : nodes) {
-//      SCMNodeStat stat = node.getScmNodeStat();
-//      double utilization = stat.getScmUsed().get().doubleValue() /
-//          stat.getCapacity().get().doubleValue();
-//      if (utilization > upperLimit) {
-//        sourceNodes.add(node);
-//      } else if (utilization < lowerLimit || utilization < avgUtilisation) {
-//        targetNodes.add(node);
-//      }
-//    }
-  }
-
-  // calculate the average datanode utilisation across the cluster
-  private double calculateAvgUtilisation(List<DatanodeUsageInfo> nodes) {
+    long numDatanodesToBalance = 0L;
+    double overLoadedBytes = 0D, underLoadedBytes = 0D;
+
+    // find over and under utilized nodes
+    for (DatanodeUsageInfo node : nodes) {
+      double utilization = calculateUtilization(node);
+      if (utilization > clusterAvgUtilisation) {
+        if (utilization > upperLimit) {
+          overUtilizedNodes.add(node);
+          numDatanodesToBalance += 1;
+
+          // amount of bytes greater than upper limit in this node
+          overLoadedBytes +=
+              ratioToBytes(node.getScmNodeStat().getCapacity().get(),
+                  utilization) -
+                  ratioToBytes(node.getScmNodeStat().getCapacity().get(),
+                      upperLimit);
+        } else {
+          aboveAverageUtilizedNodes.add(node);
+        }
+      } else if (utilization < clusterAvgUtilisation) {
+        if (utilization < lowerLimit) {
+          underUtilizedNodes.add(node);
+          numDatanodesToBalance += 1;
+
+          // amount of bytes lesser than lower limit in this node
+          underLoadedBytes +=
+              ratioToBytes(node.getScmNodeStat().getCapacity().get(),
+                  lowerLimit) -
+                  ratioToBytes(node.getScmNodeStat().getCapacity().get(),
+                      utilization);
+        } else {
+          belowAverageUtilizedNodes.add(node);
+        }
+      }
+    }
+
+    Collections.reverse(underUtilizedNodes);
+    Collections.reverse(belowAverageUtilizedNodes);
+
+    long numDatanodesBalanced = 0;
+    // count number of nodes that were balanced in previous iteration
+    for (DatanodeUsageInfo node : sourceNodes) {
+      if (!containsNode(overUtilizedNodes, node) &&
+          !containsNode(underUtilizedNodes, node)) {
+        numDatanodesBalanced += 1;
+      }
+    }
+
+    // calculate total number of nodes that have been balanced
+    numDatanodesBalanced =
+        numDatanodesBalanced + metrics.getNumDatanodesBalanced().get();
+    metrics.setNumDatanodesBalanced(new LongMetric(numDatanodesBalanced));
+    sourceNodes = new ArrayList<>(
+        overUtilizedNodes.size() + underUtilizedNodes.size());
+
+    if (numDatanodesBalanced + numDatanodesToBalance > maxDatanodesToBalance) {
+      LOG.info("Approaching Max Datanodes To Balance limit in Container " +
+          "Balancer. Stopping Balancer.");
+      stop();
+      return false;
+    } else {
+      sourceNodes.addAll(overUtilizedNodes);
+      sourceNodes.addAll(underUtilizedNodes);
+
+      if (sourceNodes.isEmpty()) {
+        LOG.info("Did not find any unbalanced Datanodes.");
+        stop();
+        return false;
+      } else {
+        LOG.info("Container Balancer has identified Datanodes that need to be" +
+            " balanced.");
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Performs binary search to determine if the specified listToSearch
+   * contains the specified node.
+   *
+   * @param listToSearch List of DatanodeUsageInfo to be searched.
+   * @param node DatanodeUsageInfo to be searched for.
+   * @return true if the specified node is present in listToSearch, otherwise
+   * false.
+   */
+  private boolean containsNode(

Review comment:
       That's a great suggestion. The current implementation is assuming that the order of nodes in terms of utilization is important so balancer can focus on nodes that are most over or under utilized first. That's why a sorted list is being used.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] siddhantsangwan commented on a change in pull request #2230: HDDS-4927. Add support for initializing an iteration in ContainerBalancer. Add unit tests.

Posted by GitBox <gi...@apache.org>.
siddhantsangwan commented on a change in pull request #2230:
URL: https://github.com/apache/ozone/pull/2230#discussion_r631636590



##########
File path: hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancer.java
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.container.balancer;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.hdds.scm.container.ContainerManagerV2;
+import org.apache.hadoop.hdds.scm.container.MockNodeManager;
+import org.apache.hadoop.hdds.scm.container.ReplicationManager;
+import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
+import org.apache.hadoop.hdds.scm.node.DatanodeUsageInfo;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+public class TestContainerBalancer {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestContainerBalancer.class);
+
+  private ReplicationManager replicationManager;
+  private ContainerManagerV2 containerManager;
+  private ContainerBalancer containerBalancer;
+  private MockNodeManager mockNodeManager;
+  private OzoneConfiguration conf;
+  private ContainerBalancerConfiguration balancerConfiguration;
+  private List<DatanodeUsageInfo> nodesInCluster;
+  private List<Double> nodeUtilizations;
+  private double averageUtilization;
+  private int numberOfNodes;
+
+  /**
+   * Sets up configuration values and creates a mock cluster.
+   */
+  @Before
+  public void setup() {
+    conf = new OzoneConfiguration();
+    containerManager = Mockito.mock(ContainerManagerV2.class);
+    replicationManager = Mockito.mock(ReplicationManager.class);
+
+    balancerConfiguration = new ContainerBalancerConfiguration();
+    balancerConfiguration.setThreshold("0.1");
+    balancerConfiguration.setMaxDatanodesToBalance(10);
+    balancerConfiguration.setMaxSizeToMove(500L);
+    conf.setFromObject(balancerConfiguration);
+
+    this.numberOfNodes = 10;
+    generateUtilizations(numberOfNodes);

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] GlenGeng commented on a change in pull request #2230: HDDS-4927. Add support for initializing an iteration in ContainerBalancer. Add unit tests.

Posted by GitBox <gi...@apache.org>.
GlenGeng commented on a change in pull request #2230:
URL: https://github.com/apache/ozone/pull/2230#discussion_r637873957



##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerMetrics.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.container.balancer;
+
+import org.apache.hadoop.hdds.scm.container.placement.metrics.LongMetric;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+
+@Metrics(name = "ContainerBalancer Metrics", about = "Metrics related to " +
+    "Container Balancer running in SCM", context = "SCM")
+public final class ContainerBalancerMetrics {
+
+  @Metric(about = "The total amount of used space in GigaBytes that needs to " +
+      "be balanced.")
+  private LongMetric totalSizeToBalanceGB;
+
+  @Metric(about = "The amount of Giga Bytes that have been moved to achieve " +
+      "balance.")
+  private LongMetric gigaBytesMoved;
+
+  @Metric(about = "Number of containers that Container Balancer has moved" +
+      " until now.")
+  private LongMetric numContainersMoved;
+
+  @Metric(about = "The total number of datanodes that need to be balanced.")
+  private LongMetric totalNumDatanodesToBalance;
+
+  @Metric(about = "Number of datanodes that Container Balancer has balanced " +
+      "until now.")
+  private LongMetric numDatanodesBalanced;

Review comment:
       NIT: `numDatanodesBalanced` to `datanodeNumBalanced`

##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerMetrics.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.container.balancer;
+
+import org.apache.hadoop.hdds.scm.container.placement.metrics.LongMetric;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+
+@Metrics(name = "ContainerBalancer Metrics", about = "Metrics related to " +
+    "Container Balancer running in SCM", context = "SCM")
+public final class ContainerBalancerMetrics {
+
+  @Metric(about = "The total amount of used space in GigaBytes that needs to " +
+      "be balanced.")
+  private LongMetric totalSizeToBalanceGB;
+
+  @Metric(about = "The amount of Giga Bytes that have been moved to achieve " +
+      "balance.")
+  private LongMetric gigaBytesMoved;
+
+  @Metric(about = "Number of containers that Container Balancer has moved" +
+      " until now.")
+  private LongMetric numContainersMoved;
+
+  @Metric(about = "The total number of datanodes that need to be balanced.")
+  private LongMetric totalNumDatanodesToBalance;

Review comment:
       NIT: `totalNumDatanodesToBalance` to `datanodeNumToBalance`

##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerMetrics.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.container.balancer;
+
+import org.apache.hadoop.hdds.scm.container.placement.metrics.LongMetric;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+
+@Metrics(name = "ContainerBalancer Metrics", about = "Metrics related to " +
+    "Container Balancer running in SCM", context = "SCM")
+public final class ContainerBalancerMetrics {
+
+  @Metric(about = "The total amount of used space in GigaBytes that needs to " +
+      "be balanced.")
+  private LongMetric totalSizeToBalanceGB;
+
+  @Metric(about = "The amount of Giga Bytes that have been moved to achieve " +
+      "balance.")
+  private LongMetric gigaBytesMoved;
+
+  @Metric(about = "Number of containers that Container Balancer has moved" +
+      " until now.")
+  private LongMetric numContainersMoved;

Review comment:
       NIT: `numContainersMoved` to `movedContainerNum`

##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerMetrics.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.container.balancer;
+
+import org.apache.hadoop.hdds.scm.container.placement.metrics.LongMetric;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+
+@Metrics(name = "ContainerBalancer Metrics", about = "Metrics related to " +
+    "Container Balancer running in SCM", context = "SCM")
+public final class ContainerBalancerMetrics {
+
+  @Metric(about = "The total amount of used space in GigaBytes that needs to " +
+      "be balanced.")
+  private LongMetric totalSizeToBalanceGB;

Review comment:
       NIT `totalSizeToBalanceGB` -> `dataSizeToBalanceGB`

##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerMetrics.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.container.balancer;
+
+import org.apache.hadoop.hdds.scm.container.placement.metrics.LongMetric;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+
+@Metrics(name = "ContainerBalancer Metrics", about = "Metrics related to " +
+    "Container Balancer running in SCM", context = "SCM")
+public final class ContainerBalancerMetrics {
+
+  @Metric(about = "The total amount of used space in GigaBytes that needs to " +
+      "be balanced.")
+  private LongMetric totalSizeToBalanceGB;
+
+  @Metric(about = "The amount of Giga Bytes that have been moved to achieve " +
+      "balance.")
+  private LongMetric gigaBytesMoved;

Review comment:
       NIT: `gigaBytesMoved` to `dataSizeBalancedGB`

##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
##########
@@ -40,96 +44,345 @@
   private ContainerManagerV2 containerManager;
   private ReplicationManager replicationManager;
   private OzoneConfiguration ozoneConfiguration;
+  private final SCMContext scmContext;
   private double threshold;
   private int maxDatanodesToBalance;
   private long maxSizeToMove;
-  private boolean balancerRunning;
-  private List<DatanodeUsageInfo> sourceNodes;
-  private List<DatanodeUsageInfo> targetNodes;
+  private List<DatanodeUsageInfo> unBalancedNodes;
+  private List<DatanodeUsageInfo> overUtilizedNodes;
+  private List<DatanodeUsageInfo> underUtilizedNodes;
+  private List<DatanodeUsageInfo> withinThresholdUtilizedNodes;
   private ContainerBalancerConfiguration config;
+  private ContainerBalancerMetrics metrics;
+  private long clusterCapacity;
+  private long clusterUsed;
+  private long clusterRemaining;
+  private double clusterAvgUtilisation;
+  private final AtomicBoolean balancerRunning = new AtomicBoolean(false);
 
+  /**
+   * Constructs ContainerBalancer with the specified arguments. Initializes
+   * new ContainerBalancerConfiguration and ContainerBalancerMetrics.
+   * Container Balancer does not start on construction.
+   *
+   * @param nodeManager NodeManager
+   * @param containerManager ContainerManager
+   * @param replicationManager ReplicationManager
+   * @param ozoneConfiguration OzoneConfiguration
+   */
   public ContainerBalancer(
       NodeManager nodeManager,
       ContainerManagerV2 containerManager,
       ReplicationManager replicationManager,
-      OzoneConfiguration ozoneConfiguration) {
+      OzoneConfiguration ozoneConfiguration,
+      final SCMContext scmContext) {
     this.nodeManager = nodeManager;
     this.containerManager = containerManager;
     this.replicationManager = replicationManager;
     this.ozoneConfiguration = ozoneConfiguration;
-    this.balancerRunning = false;
     this.config = new ContainerBalancerConfiguration();
+    this.metrics = new ContainerBalancerMetrics();
+    this.scmContext = scmContext;
+
+    this.clusterCapacity = 0L;
+    this.clusterUsed = 0L;
+    this.clusterRemaining = 0L;
+
+    this.overUtilizedNodes = new ArrayList<>();
+    this.underUtilizedNodes = new ArrayList<>();
+    this.unBalancedNodes = new ArrayList<>();
+    this.withinThresholdUtilizedNodes = new ArrayList<>();
   }
 
   /**
-   * Start ContainerBalancer. Current implementation is incomplete.
+   * Starts ContainerBalancer. Current implementation is incomplete.
    *
    * @param balancerConfiguration Configuration values.
    */
-  public void start(ContainerBalancerConfiguration balancerConfiguration) {
-    this.balancerRunning = true;
+  public boolean start(ContainerBalancerConfiguration balancerConfiguration) {
+    if (!balancerRunning.compareAndSet(false, true)) {
+      LOG.error("Container Balancer is already running.");
+      return false;
+    }
 
     ozoneConfiguration = new OzoneConfiguration();
-
-    // initialise configs
     this.config = balancerConfiguration;
     this.threshold = config.getThreshold();
-    this.maxDatanodesToBalance =
-        config.getMaxDatanodesToBalance();
+    this.maxDatanodesToBalance = config.getMaxDatanodesToBalance();
     this.maxSizeToMove = config.getMaxSizeToMove();
+    this.unBalancedNodes = new ArrayList<>();
+
+    LOG.info("Starting Container Balancer...{}", this);
+    balance();
+    return true;
+  }
 
-    LOG.info("Starting Container Balancer...");
+  /**
+   * Balances the cluster.
+   */
+  private void balance() {
+    initializeIteration();
 
+    // unBalancedNodes is not cleared since the next iteration uses this
+    // iteration's unBalancedNodes to find out how many nodes were balanced
+    overUtilizedNodes.clear();
+    underUtilizedNodes.clear();
+    withinThresholdUtilizedNodes.clear();
+  }
+
+  /**
+   * Initializes an iteration during balancing. Recognizes over, under, and
+   * within threshold utilized nodes. Decides whether balancing needs to
+   * continue or should be stopped.
+   *
+   * @return true if successfully initialized, otherwise false.
+   */
+  private boolean initializeIteration() {
+    if (scmContext.isInSafeMode()) {
+      LOG.error("Container Balancer cannot operate while SCM is in Safe Mode.");
+      return false;
+    }
     // sorted list in order from most to least used
-    List<DatanodeUsageInfo> nodes = nodeManager.
-        getMostOrLeastUsedDatanodes(true);
-    double avgUtilisation = calculateAvgUtilisation(nodes);
+    List<DatanodeUsageInfo> datanodeUsageInfos =
+        nodeManager.getMostOrLeastUsedDatanodes(true);
+    if (datanodeUsageInfos.isEmpty()) {
+      LOG.info("Container Balancer could not retrieve nodes from Node " +
+          "Manager.");
+      stop();
+      return false;
+    }
+
+    clusterAvgUtilisation = calculateAvgUtilization(datanodeUsageInfos);
+    LOG.info("Average utilization of the cluster is {}", clusterAvgUtilisation);
 
     // under utilized nodes have utilization(that is, used / capacity) less
     // than lower limit
-    double lowerLimit = avgUtilisation - threshold;
+    double lowerLimit = clusterAvgUtilisation - threshold;
 
     // over utilized nodes have utilization(that is, used / capacity) greater
     // than upper limit
-    double upperLimit = avgUtilisation + threshold;
-    LOG.info("Lower limit for utilization is {}", lowerLimit);
-    LOG.info("Upper limit for utilization is {}", upperLimit);
-
-    // find over utilised(source) and under utilised(target) nodes
-    sourceNodes = new ArrayList<>();
-    targetNodes = new ArrayList<>();
-//    for (DatanodeUsageInfo node : nodes) {
-//      SCMNodeStat stat = node.getScmNodeStat();
-//      double utilization = stat.getScmUsed().get().doubleValue() /
-//          stat.getCapacity().get().doubleValue();
-//      if (utilization > upperLimit) {
-//        sourceNodes.add(node);
-//      } else if (utilization < lowerLimit || utilization < avgUtilisation) {
-//        targetNodes.add(node);
-//      }
-//    }
-  }
-
-  // calculate the average datanode utilisation across the cluster
-  private double calculateAvgUtilisation(List<DatanodeUsageInfo> nodes) {
+    double upperLimit = clusterAvgUtilisation + threshold;
+
+    LOG.info("Lower limit for utilization is {} and Upper limit for " +
+        "utilization is {}", lowerLimit, upperLimit);
+
+    long numDatanodesToBalance = 0L;
+    double overLoadedBytes = 0D, underLoadedBytes = 0D;
+
+    // find over and under utilized nodes
+    for (DatanodeUsageInfo datanodeUsageInfo : datanodeUsageInfos) {
+      double utilization = calculateUtilization(datanodeUsageInfo);
+      if (utilization > upperLimit) {
+        overUtilizedNodes.add(datanodeUsageInfo);
+        numDatanodesToBalance += 1;
+
+        // amount of bytes greater than upper limit in this node
+        overLoadedBytes += ratioToBytes(
+            datanodeUsageInfo.getScmNodeStat().getCapacity().get(),
+            utilization) - ratioToBytes(
+            datanodeUsageInfo.getScmNodeStat().getCapacity().get(),
+            upperLimit);
+      } else if (utilization < lowerLimit) {
+        underUtilizedNodes.add(datanodeUsageInfo);
+        numDatanodesToBalance += 1;
+
+        // amount of bytes lesser than lower limit in this node
+        underLoadedBytes += ratioToBytes(
+            datanodeUsageInfo.getScmNodeStat().getCapacity().get(),
+            lowerLimit) - ratioToBytes(
+            datanodeUsageInfo.getScmNodeStat().getCapacity().get(),
+            utilization);
+      } else {
+        withinThresholdUtilizedNodes.add(datanodeUsageInfo);
+      }
+    }
+    Collections.reverse(underUtilizedNodes);
+
+    long numDatanodesBalanced = 0;
+    // count number of nodes that were balanced in previous iteration
+    for (DatanodeUsageInfo node : unBalancedNodes) {
+      if (!containsNode(overUtilizedNodes, node) &&
+          !containsNode(underUtilizedNodes, node)) {
+        numDatanodesBalanced += 1;
+      }
+    }
+    // calculate total number of nodes that have been balanced so far
+    numDatanodesBalanced =
+        metrics.incrementNumDatanodesBalanced(numDatanodesBalanced);
+
+    unBalancedNodes = new ArrayList<>(
+        overUtilizedNodes.size() + underUtilizedNodes.size());
+
+    if (numDatanodesBalanced + numDatanodesToBalance > maxDatanodesToBalance) {
+      LOG.info("Approaching Max Datanodes To Balance limit in Container " +
+          "Balancer. Stopping Balancer.");
+      stop();
+      return false;
+    } else {
+      unBalancedNodes.addAll(overUtilizedNodes);
+      unBalancedNodes.addAll(underUtilizedNodes);
+
+      if (unBalancedNodes.isEmpty()) {
+        LOG.info("Did not find any unbalanced Datanodes.");
+        stop();
+        return false;
+      } else {
+        LOG.info("Container Balancer has identified Datanodes that need to be" +
+            " balanced.");
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Performs binary search to determine if the specified listToSearch
+   * contains the specified node.
+   *
+   * @param listToSearch List of DatanodeUsageInfo to be searched.
+   * @param node DatanodeUsageInfo to be searched for.
+   * @return true if the specified node is present in listToSearch, otherwise
+   * false.
+   */
+  private boolean containsNode(
+      List<DatanodeUsageInfo> listToSearch, DatanodeUsageInfo node) {
+    int index = 0;
+    Comparator<DatanodeUsageInfo> comparator =
+        DatanodeUsageInfo.getMostUsedByRemainingRatio();
+
+    if (comparator.compare(listToSearch.get(0),
+        listToSearch.get(listToSearch.size() - 1)) < 0) {
+      index =
+          Collections.binarySearch(listToSearch, node, comparator.reversed());
+    } else {
+      index = Collections.binarySearch(listToSearch, node, comparator);
+    }
+    return index >= 0 && listToSearch.get(index).equals(node);
+  }
+
+  /**
+   * Calculates the number of used bytes given capacity and utilization ratio.
+   *
+   * @param nodeCapacity capacity of the node.
+   * @param utilizationRatio used space by capacity ratio of the node.
+   * @return number of bytes
+   */
+  private double ratioToBytes(Long nodeCapacity, double utilizationRatio) {
+    return nodeCapacity * utilizationRatio;
+  }
+
+  /**
+   * Calculates the average datanode utilization for the specified nodes.
+   * Utilization is used space divided by capacity.
+   *
+   * @param nodes List of DatanodeUsageInfo to find the average utilization for
+   * @return Average utilization value
+   * @throws ArithmeticException Division by zero
+   */
+  private double calculateAvgUtilization(List<DatanodeUsageInfo> nodes)
+      throws ArithmeticException {
     SCMNodeStat aggregatedStats = new SCMNodeStat(
         0, 0, 0);
     for (DatanodeUsageInfo node : nodes) {
       aggregatedStats.add(node.getScmNodeStat());
     }
-    return aggregatedStats.getScmUsed().get().doubleValue() /
-        aggregatedStats.getCapacity().get().doubleValue();
+    clusterCapacity = aggregatedStats.getCapacity().get();
+    clusterUsed = aggregatedStats.getScmUsed().get();
+    clusterRemaining = aggregatedStats.getRemaining().get();
+
+    try {
+      return clusterUsed / (double) clusterCapacity;
+    } catch (ArithmeticException e) {
+      LOG.warn("Division by zero while calculating average utilization of the" +
+          " cluster in ContainerBalancer.");
+      throw e;
+    }
+  }
+
+  /**
+   * Calculates the utilization, that is used space divided by capacity, for
+   * the given datanodeUsageInfo.
+   *
+   * @param datanodeUsageInfo DatanodeUsageInfo to calculate utilization for
+   * @return Utilization value
+   */
+  private double calculateUtilization(DatanodeUsageInfo datanodeUsageInfo) {
+    SCMNodeStat stat = datanodeUsageInfo.getScmNodeStat();
+
+    return stat.getScmUsed().get().doubleValue() /
+        stat.getCapacity().get().doubleValue();
   }
 
+  /**
+   * Stops ContainerBalancer.
+   */
   public void stop() {
     LOG.info("Stopping Container Balancer...");
-    balancerRunning = false;
+    balancerRunning.set(false);

Review comment:
       NIT. remove line 319, one info line is sufficient here, since no actual work needs to be done here.

##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
##########
@@ -40,96 +44,345 @@
   private ContainerManagerV2 containerManager;
   private ReplicationManager replicationManager;
   private OzoneConfiguration ozoneConfiguration;
+  private final SCMContext scmContext;
   private double threshold;
   private int maxDatanodesToBalance;
   private long maxSizeToMove;
-  private boolean balancerRunning;
-  private List<DatanodeUsageInfo> sourceNodes;
-  private List<DatanodeUsageInfo> targetNodes;
+  private List<DatanodeUsageInfo> unBalancedNodes;
+  private List<DatanodeUsageInfo> overUtilizedNodes;
+  private List<DatanodeUsageInfo> underUtilizedNodes;
+  private List<DatanodeUsageInfo> withinThresholdUtilizedNodes;
   private ContainerBalancerConfiguration config;
+  private ContainerBalancerMetrics metrics;
+  private long clusterCapacity;
+  private long clusterUsed;
+  private long clusterRemaining;
+  private double clusterAvgUtilisation;
+  private final AtomicBoolean balancerRunning = new AtomicBoolean(false);
 
+  /**
+   * Constructs ContainerBalancer with the specified arguments. Initializes
+   * new ContainerBalancerConfiguration and ContainerBalancerMetrics.
+   * Container Balancer does not start on construction.
+   *
+   * @param nodeManager NodeManager
+   * @param containerManager ContainerManager
+   * @param replicationManager ReplicationManager
+   * @param ozoneConfiguration OzoneConfiguration
+   */
   public ContainerBalancer(
       NodeManager nodeManager,
       ContainerManagerV2 containerManager,
       ReplicationManager replicationManager,
-      OzoneConfiguration ozoneConfiguration) {
+      OzoneConfiguration ozoneConfiguration,
+      final SCMContext scmContext) {
     this.nodeManager = nodeManager;
     this.containerManager = containerManager;
     this.replicationManager = replicationManager;
     this.ozoneConfiguration = ozoneConfiguration;
-    this.balancerRunning = false;
     this.config = new ContainerBalancerConfiguration();
+    this.metrics = new ContainerBalancerMetrics();
+    this.scmContext = scmContext;
+
+    this.clusterCapacity = 0L;
+    this.clusterUsed = 0L;
+    this.clusterRemaining = 0L;
+
+    this.overUtilizedNodes = new ArrayList<>();
+    this.underUtilizedNodes = new ArrayList<>();
+    this.unBalancedNodes = new ArrayList<>();
+    this.withinThresholdUtilizedNodes = new ArrayList<>();
   }
 
   /**
-   * Start ContainerBalancer. Current implementation is incomplete.
+   * Starts ContainerBalancer. Current implementation is incomplete.
    *
    * @param balancerConfiguration Configuration values.
    */
-  public void start(ContainerBalancerConfiguration balancerConfiguration) {
-    this.balancerRunning = true;
+  public boolean start(ContainerBalancerConfiguration balancerConfiguration) {
+    if (!balancerRunning.compareAndSet(false, true)) {
+      LOG.error("Container Balancer is already running.");
+      return false;
+    }
 
     ozoneConfiguration = new OzoneConfiguration();
-
-    // initialise configs
     this.config = balancerConfiguration;
     this.threshold = config.getThreshold();
-    this.maxDatanodesToBalance =
-        config.getMaxDatanodesToBalance();
+    this.maxDatanodesToBalance = config.getMaxDatanodesToBalance();
     this.maxSizeToMove = config.getMaxSizeToMove();
+    this.unBalancedNodes = new ArrayList<>();
+
+    LOG.info("Starting Container Balancer...{}", this);
+    balance();
+    return true;
+  }
 
-    LOG.info("Starting Container Balancer...");
+  /**
+   * Balances the cluster.
+   */
+  private void balance() {
+    initializeIteration();
 
+    // unBalancedNodes is not cleared since the next iteration uses this
+    // iteration's unBalancedNodes to find out how many nodes were balanced
+    overUtilizedNodes.clear();
+    underUtilizedNodes.clear();
+    withinThresholdUtilizedNodes.clear();
+  }
+
+  /**
+   * Initializes an iteration during balancing. Recognizes over, under, and
+   * within threshold utilized nodes. Decides whether balancing needs to
+   * continue or should be stopped.
+   *
+   * @return true if successfully initialized, otherwise false.
+   */
+  private boolean initializeIteration() {
+    if (scmContext.isInSafeMode()) {
+      LOG.error("Container Balancer cannot operate while SCM is in Safe Mode.");
+      return false;
+    }
     // sorted list in order from most to least used
-    List<DatanodeUsageInfo> nodes = nodeManager.
-        getMostOrLeastUsedDatanodes(true);
-    double avgUtilisation = calculateAvgUtilisation(nodes);
+    List<DatanodeUsageInfo> datanodeUsageInfos =
+        nodeManager.getMostOrLeastUsedDatanodes(true);
+    if (datanodeUsageInfos.isEmpty()) {
+      LOG.info("Container Balancer could not retrieve nodes from Node " +
+          "Manager.");
+      stop();
+      return false;
+    }
+
+    clusterAvgUtilisation = calculateAvgUtilization(datanodeUsageInfos);
+    LOG.info("Average utilization of the cluster is {}", clusterAvgUtilisation);
 
     // under utilized nodes have utilization(that is, used / capacity) less
     // than lower limit
-    double lowerLimit = avgUtilisation - threshold;
+    double lowerLimit = clusterAvgUtilisation - threshold;
 
     // over utilized nodes have utilization(that is, used / capacity) greater
     // than upper limit
-    double upperLimit = avgUtilisation + threshold;
-    LOG.info("Lower limit for utilization is {}", lowerLimit);
-    LOG.info("Upper limit for utilization is {}", upperLimit);
-
-    // find over utilised(source) and under utilised(target) nodes
-    sourceNodes = new ArrayList<>();
-    targetNodes = new ArrayList<>();
-//    for (DatanodeUsageInfo node : nodes) {
-//      SCMNodeStat stat = node.getScmNodeStat();
-//      double utilization = stat.getScmUsed().get().doubleValue() /
-//          stat.getCapacity().get().doubleValue();
-//      if (utilization > upperLimit) {
-//        sourceNodes.add(node);
-//      } else if (utilization < lowerLimit || utilization < avgUtilisation) {
-//        targetNodes.add(node);
-//      }
-//    }
-  }
-
-  // calculate the average datanode utilisation across the cluster
-  private double calculateAvgUtilisation(List<DatanodeUsageInfo> nodes) {
+    double upperLimit = clusterAvgUtilisation + threshold;
+
+    LOG.info("Lower limit for utilization is {} and Upper limit for " +
+        "utilization is {}", lowerLimit, upperLimit);
+
+    long numDatanodesToBalance = 0L;
+    double overLoadedBytes = 0D, underLoadedBytes = 0D;
+
+    // find over and under utilized nodes
+    for (DatanodeUsageInfo datanodeUsageInfo : datanodeUsageInfos) {
+      double utilization = calculateUtilization(datanodeUsageInfo);
+      if (utilization > upperLimit) {
+        overUtilizedNodes.add(datanodeUsageInfo);
+        numDatanodesToBalance += 1;
+
+        // amount of bytes greater than upper limit in this node
+        overLoadedBytes += ratioToBytes(
+            datanodeUsageInfo.getScmNodeStat().getCapacity().get(),
+            utilization) - ratioToBytes(
+            datanodeUsageInfo.getScmNodeStat().getCapacity().get(),
+            upperLimit);
+      } else if (utilization < lowerLimit) {
+        underUtilizedNodes.add(datanodeUsageInfo);
+        numDatanodesToBalance += 1;
+
+        // amount of bytes lesser than lower limit in this node
+        underLoadedBytes += ratioToBytes(
+            datanodeUsageInfo.getScmNodeStat().getCapacity().get(),
+            lowerLimit) - ratioToBytes(
+            datanodeUsageInfo.getScmNodeStat().getCapacity().get(),
+            utilization);
+      } else {
+        withinThresholdUtilizedNodes.add(datanodeUsageInfo);
+      }
+    }
+    Collections.reverse(underUtilizedNodes);
+
+    long numDatanodesBalanced = 0;
+    // count number of nodes that were balanced in previous iteration
+    for (DatanodeUsageInfo node : unBalancedNodes) {
+      if (!containsNode(overUtilizedNodes, node) &&
+          !containsNode(underUtilizedNodes, node)) {
+        numDatanodesBalanced += 1;
+      }
+    }
+    // calculate total number of nodes that have been balanced so far
+    numDatanodesBalanced =
+        metrics.incrementNumDatanodesBalanced(numDatanodesBalanced);
+
+    unBalancedNodes = new ArrayList<>(
+        overUtilizedNodes.size() + underUtilizedNodes.size());
+
+    if (numDatanodesBalanced + numDatanodesToBalance > maxDatanodesToBalance) {
+      LOG.info("Approaching Max Datanodes To Balance limit in Container " +
+          "Balancer. Stopping Balancer.");
+      stop();
+      return false;
+    } else {
+      unBalancedNodes.addAll(overUtilizedNodes);
+      unBalancedNodes.addAll(underUtilizedNodes);
+
+      if (unBalancedNodes.isEmpty()) {
+        LOG.info("Did not find any unbalanced Datanodes.");
+        stop();
+        return false;
+      } else {
+        LOG.info("Container Balancer has identified Datanodes that need to be" +
+            " balanced.");
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Performs binary search to determine if the specified listToSearch
+   * contains the specified node.
+   *
+   * @param listToSearch List of DatanodeUsageInfo to be searched.
+   * @param node DatanodeUsageInfo to be searched for.
+   * @return true if the specified node is present in listToSearch, otherwise
+   * false.
+   */
+  private boolean containsNode(
+      List<DatanodeUsageInfo> listToSearch, DatanodeUsageInfo node) {
+    int index = 0;
+    Comparator<DatanodeUsageInfo> comparator =
+        DatanodeUsageInfo.getMostUsedByRemainingRatio();
+
+    if (comparator.compare(listToSearch.get(0),
+        listToSearch.get(listToSearch.size() - 1)) < 0) {
+      index =
+          Collections.binarySearch(listToSearch, node, comparator.reversed());
+    } else {
+      index = Collections.binarySearch(listToSearch, node, comparator);
+    }
+    return index >= 0 && listToSearch.get(index).equals(node);
+  }
+
+  /**
+   * Calculates the number of used bytes given capacity and utilization ratio.
+   *
+   * @param nodeCapacity capacity of the node.
+   * @param utilizationRatio used space by capacity ratio of the node.
+   * @return number of bytes
+   */
+  private double ratioToBytes(Long nodeCapacity, double utilizationRatio) {
+    return nodeCapacity * utilizationRatio;
+  }
+
+  /**
+   * Calculates the average datanode utilization for the specified nodes.
+   * Utilization is used space divided by capacity.
+   *
+   * @param nodes List of DatanodeUsageInfo to find the average utilization for
+   * @return Average utilization value
+   * @throws ArithmeticException Division by zero
+   */
+  private double calculateAvgUtilization(List<DatanodeUsageInfo> nodes)
+      throws ArithmeticException {
     SCMNodeStat aggregatedStats = new SCMNodeStat(
         0, 0, 0);
     for (DatanodeUsageInfo node : nodes) {
       aggregatedStats.add(node.getScmNodeStat());
     }
-    return aggregatedStats.getScmUsed().get().doubleValue() /
-        aggregatedStats.getCapacity().get().doubleValue();
+    clusterCapacity = aggregatedStats.getCapacity().get();
+    clusterUsed = aggregatedStats.getScmUsed().get();
+    clusterRemaining = aggregatedStats.getRemaining().get();
+
+    try {
+      return clusterUsed / (double) clusterCapacity;
+    } catch (ArithmeticException e) {
+      LOG.warn("Division by zero while calculating average utilization of the" +
+          " cluster in ContainerBalancer.");
+      throw e;
+    }
+  }
+
+  /**
+   * Calculates the utilization, that is used space divided by capacity, for
+   * the given datanodeUsageInfo.
+   *
+   * @param datanodeUsageInfo DatanodeUsageInfo to calculate utilization for
+   * @return Utilization value
+   */
+  private double calculateUtilization(DatanodeUsageInfo datanodeUsageInfo) {

Review comment:
       NIT. make it to be a static helper function.

##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
##########
@@ -40,96 +44,345 @@
   private ContainerManagerV2 containerManager;
   private ReplicationManager replicationManager;
   private OzoneConfiguration ozoneConfiguration;
+  private final SCMContext scmContext;
   private double threshold;
   private int maxDatanodesToBalance;
   private long maxSizeToMove;
-  private boolean balancerRunning;
-  private List<DatanodeUsageInfo> sourceNodes;
-  private List<DatanodeUsageInfo> targetNodes;
+  private List<DatanodeUsageInfo> unBalancedNodes;
+  private List<DatanodeUsageInfo> overUtilizedNodes;
+  private List<DatanodeUsageInfo> underUtilizedNodes;
+  private List<DatanodeUsageInfo> withinThresholdUtilizedNodes;
   private ContainerBalancerConfiguration config;
+  private ContainerBalancerMetrics metrics;
+  private long clusterCapacity;
+  private long clusterUsed;
+  private long clusterRemaining;
+  private double clusterAvgUtilisation;
+  private final AtomicBoolean balancerRunning = new AtomicBoolean(false);
 
+  /**
+   * Constructs ContainerBalancer with the specified arguments. Initializes
+   * new ContainerBalancerConfiguration and ContainerBalancerMetrics.
+   * Container Balancer does not start on construction.
+   *
+   * @param nodeManager NodeManager
+   * @param containerManager ContainerManager
+   * @param replicationManager ReplicationManager
+   * @param ozoneConfiguration OzoneConfiguration
+   */
   public ContainerBalancer(
       NodeManager nodeManager,
       ContainerManagerV2 containerManager,
       ReplicationManager replicationManager,
-      OzoneConfiguration ozoneConfiguration) {
+      OzoneConfiguration ozoneConfiguration,
+      final SCMContext scmContext) {
     this.nodeManager = nodeManager;
     this.containerManager = containerManager;
     this.replicationManager = replicationManager;
     this.ozoneConfiguration = ozoneConfiguration;
-    this.balancerRunning = false;
     this.config = new ContainerBalancerConfiguration();
+    this.metrics = new ContainerBalancerMetrics();
+    this.scmContext = scmContext;
+
+    this.clusterCapacity = 0L;
+    this.clusterUsed = 0L;
+    this.clusterRemaining = 0L;
+
+    this.overUtilizedNodes = new ArrayList<>();
+    this.underUtilizedNodes = new ArrayList<>();
+    this.unBalancedNodes = new ArrayList<>();
+    this.withinThresholdUtilizedNodes = new ArrayList<>();
   }
 
   /**
-   * Start ContainerBalancer. Current implementation is incomplete.
+   * Starts ContainerBalancer. Current implementation is incomplete.
    *
    * @param balancerConfiguration Configuration values.
    */
-  public void start(ContainerBalancerConfiguration balancerConfiguration) {
-    this.balancerRunning = true;
+  public boolean start(ContainerBalancerConfiguration balancerConfiguration) {
+    if (!balancerRunning.compareAndSet(false, true)) {
+      LOG.error("Container Balancer is already running.");
+      return false;
+    }
 
     ozoneConfiguration = new OzoneConfiguration();
-
-    // initialise configs
     this.config = balancerConfiguration;
     this.threshold = config.getThreshold();
-    this.maxDatanodesToBalance =
-        config.getMaxDatanodesToBalance();
+    this.maxDatanodesToBalance = config.getMaxDatanodesToBalance();
     this.maxSizeToMove = config.getMaxSizeToMove();
+    this.unBalancedNodes = new ArrayList<>();
+
+    LOG.info("Starting Container Balancer...{}", this);
+    balance();
+    return true;
+  }
 
-    LOG.info("Starting Container Balancer...");
+  /**
+   * Balances the cluster.
+   */
+  private void balance() {
+    initializeIteration();
 
+    // unBalancedNodes is not cleared since the next iteration uses this
+    // iteration's unBalancedNodes to find out how many nodes were balanced
+    overUtilizedNodes.clear();
+    underUtilizedNodes.clear();
+    withinThresholdUtilizedNodes.clear();
+  }
+
+  /**
+   * Initializes an iteration during balancing. Recognizes over, under, and
+   * within threshold utilized nodes. Decides whether balancing needs to
+   * continue or should be stopped.
+   *
+   * @return true if successfully initialized, otherwise false.
+   */
+  private boolean initializeIteration() {
+    if (scmContext.isInSafeMode()) {
+      LOG.error("Container Balancer cannot operate while SCM is in Safe Mode.");
+      return false;
+    }
     // sorted list in order from most to least used
-    List<DatanodeUsageInfo> nodes = nodeManager.
-        getMostOrLeastUsedDatanodes(true);
-    double avgUtilisation = calculateAvgUtilisation(nodes);
+    List<DatanodeUsageInfo> datanodeUsageInfos =
+        nodeManager.getMostOrLeastUsedDatanodes(true);
+    if (datanodeUsageInfos.isEmpty()) {
+      LOG.info("Container Balancer could not retrieve nodes from Node " +
+          "Manager.");
+      stop();
+      return false;
+    }
+
+    clusterAvgUtilisation = calculateAvgUtilization(datanodeUsageInfos);
+    LOG.info("Average utilization of the cluster is {}", clusterAvgUtilisation);
 
     // under utilized nodes have utilization(that is, used / capacity) less
     // than lower limit
-    double lowerLimit = avgUtilisation - threshold;
+    double lowerLimit = clusterAvgUtilisation - threshold;
 
     // over utilized nodes have utilization(that is, used / capacity) greater
     // than upper limit
-    double upperLimit = avgUtilisation + threshold;
-    LOG.info("Lower limit for utilization is {}", lowerLimit);
-    LOG.info("Upper limit for utilization is {}", upperLimit);
-
-    // find over utilised(source) and under utilised(target) nodes
-    sourceNodes = new ArrayList<>();
-    targetNodes = new ArrayList<>();
-//    for (DatanodeUsageInfo node : nodes) {
-//      SCMNodeStat stat = node.getScmNodeStat();
-//      double utilization = stat.getScmUsed().get().doubleValue() /
-//          stat.getCapacity().get().doubleValue();
-//      if (utilization > upperLimit) {
-//        sourceNodes.add(node);
-//      } else if (utilization < lowerLimit || utilization < avgUtilisation) {
-//        targetNodes.add(node);
-//      }
-//    }
-  }
-
-  // calculate the average datanode utilisation across the cluster
-  private double calculateAvgUtilisation(List<DatanodeUsageInfo> nodes) {
+    double upperLimit = clusterAvgUtilisation + threshold;
+
+    LOG.info("Lower limit for utilization is {} and Upper limit for " +
+        "utilization is {}", lowerLimit, upperLimit);
+
+    long numDatanodesToBalance = 0L;
+    double overLoadedBytes = 0D, underLoadedBytes = 0D;
+
+    // find over and under utilized nodes
+    for (DatanodeUsageInfo datanodeUsageInfo : datanodeUsageInfos) {
+      double utilization = calculateUtilization(datanodeUsageInfo);
+      if (utilization > upperLimit) {
+        overUtilizedNodes.add(datanodeUsageInfo);
+        numDatanodesToBalance += 1;
+
+        // amount of bytes greater than upper limit in this node
+        overLoadedBytes += ratioToBytes(
+            datanodeUsageInfo.getScmNodeStat().getCapacity().get(),
+            utilization) - ratioToBytes(
+            datanodeUsageInfo.getScmNodeStat().getCapacity().get(),
+            upperLimit);
+      } else if (utilization < lowerLimit) {
+        underUtilizedNodes.add(datanodeUsageInfo);
+        numDatanodesToBalance += 1;
+
+        // amount of bytes lesser than lower limit in this node
+        underLoadedBytes += ratioToBytes(
+            datanodeUsageInfo.getScmNodeStat().getCapacity().get(),
+            lowerLimit) - ratioToBytes(
+            datanodeUsageInfo.getScmNodeStat().getCapacity().get(),
+            utilization);
+      } else {
+        withinThresholdUtilizedNodes.add(datanodeUsageInfo);
+      }
+    }
+    Collections.reverse(underUtilizedNodes);
+
+    long numDatanodesBalanced = 0;
+    // count number of nodes that were balanced in previous iteration
+    for (DatanodeUsageInfo node : unBalancedNodes) {
+      if (!containsNode(overUtilizedNodes, node) &&
+          !containsNode(underUtilizedNodes, node)) {
+        numDatanodesBalanced += 1;
+      }
+    }
+    // calculate total number of nodes that have been balanced so far
+    numDatanodesBalanced =
+        metrics.incrementNumDatanodesBalanced(numDatanodesBalanced);
+
+    unBalancedNodes = new ArrayList<>(
+        overUtilizedNodes.size() + underUtilizedNodes.size());
+
+    if (numDatanodesBalanced + numDatanodesToBalance > maxDatanodesToBalance) {
+      LOG.info("Approaching Max Datanodes To Balance limit in Container " +
+          "Balancer. Stopping Balancer.");
+      stop();
+      return false;
+    } else {
+      unBalancedNodes.addAll(overUtilizedNodes);
+      unBalancedNodes.addAll(underUtilizedNodes);
+
+      if (unBalancedNodes.isEmpty()) {
+        LOG.info("Did not find any unbalanced Datanodes.");
+        stop();
+        return false;
+      } else {
+        LOG.info("Container Balancer has identified Datanodes that need to be" +
+            " balanced.");
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Performs binary search to determine if the specified listToSearch
+   * contains the specified node.
+   *
+   * @param listToSearch List of DatanodeUsageInfo to be searched.
+   * @param node DatanodeUsageInfo to be searched for.
+   * @return true if the specified node is present in listToSearch, otherwise
+   * false.
+   */
+  private boolean containsNode(
+      List<DatanodeUsageInfo> listToSearch, DatanodeUsageInfo node) {
+    int index = 0;
+    Comparator<DatanodeUsageInfo> comparator =
+        DatanodeUsageInfo.getMostUsedByRemainingRatio();
+
+    if (comparator.compare(listToSearch.get(0),
+        listToSearch.get(listToSearch.size() - 1)) < 0) {
+      index =
+          Collections.binarySearch(listToSearch, node, comparator.reversed());
+    } else {
+      index = Collections.binarySearch(listToSearch, node, comparator);
+    }
+    return index >= 0 && listToSearch.get(index).equals(node);
+  }
+
+  /**
+   * Calculates the number of used bytes given capacity and utilization ratio.
+   *
+   * @param nodeCapacity capacity of the node.
+   * @param utilizationRatio used space by capacity ratio of the node.
+   * @return number of bytes
+   */
+  private double ratioToBytes(Long nodeCapacity, double utilizationRatio) {
+    return nodeCapacity * utilizationRatio;
+  }
+
+  /**
+   * Calculates the average datanode utilization for the specified nodes.
+   * Utilization is used space divided by capacity.
+   *
+   * @param nodes List of DatanodeUsageInfo to find the average utilization for
+   * @return Average utilization value
+   * @throws ArithmeticException Division by zero
+   */
+  private double calculateAvgUtilization(List<DatanodeUsageInfo> nodes)
+      throws ArithmeticException {
     SCMNodeStat aggregatedStats = new SCMNodeStat(
         0, 0, 0);
     for (DatanodeUsageInfo node : nodes) {
       aggregatedStats.add(node.getScmNodeStat());
     }
-    return aggregatedStats.getScmUsed().get().doubleValue() /
-        aggregatedStats.getCapacity().get().doubleValue();
+    clusterCapacity = aggregatedStats.getCapacity().get();
+    clusterUsed = aggregatedStats.getScmUsed().get();
+    clusterRemaining = aggregatedStats.getRemaining().get();
+
+    try {
+      return clusterUsed / (double) clusterCapacity;
+    } catch (ArithmeticException e) {

Review comment:
       NIT. better not handle `ArithmeticException`, instead check `nodes.size() != 0` at the entrance of function.

##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerMetrics.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.container.balancer;
+
+import org.apache.hadoop.hdds.scm.container.placement.metrics.LongMetric;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+
+@Metrics(name = "ContainerBalancer Metrics", about = "Metrics related to " +
+    "Container Balancer running in SCM", context = "SCM")
+public final class ContainerBalancerMetrics {
+
+  @Metric(about = "The total amount of used space in GigaBytes that needs to " +
+      "be balanced.")
+  private LongMetric totalSizeToBalanceGB;
+
+  @Metric(about = "The amount of Giga Bytes that have been moved to achieve " +
+      "balance.")
+  private LongMetric gigaBytesMoved;
+
+  @Metric(about = "Number of containers that Container Balancer has moved" +
+      " until now.")
+  private LongMetric numContainersMoved;
+
+  @Metric(about = "The total number of datanodes that need to be balanced.")
+  private LongMetric totalNumDatanodesToBalance;
+
+  @Metric(about = "Number of datanodes that Container Balancer has balanced " +
+      "until now.")
+  private LongMetric numDatanodesBalanced;
+
+  @Metric(about = "Utilisation value of the current maximum utilised datanode.")
+  private double maxUtilizedDatanodeRatio;

Review comment:
       NIT: `maxUtilizedDatanodeRatio` to `maxDatanodeUtilizedRatio`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] siddhantsangwan commented on pull request #2230: HDDS-4927. Add support for initializing an iteration in ContainerBalancer. Add unit tests.

Posted by GitBox <gi...@apache.org>.
siddhantsangwan commented on pull request #2230:
URL: https://github.com/apache/ozone/pull/2230#issuecomment-846904392


   > @siddhantsangwan Thanks for updating the PR! The changes look good to me. +1.
   > 
   > Can you create another jira for [#2230 (comment)](https://github.com/apache/ozone/pull/2230#discussion_r630797680) or handle it in next jira?
   
   I have handled that case in the `containsNode` method. Do you mean changing the comparator itself?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] siddhantsangwan commented on a change in pull request #2230: HDDS-4927. Add support for initializing an iteration in ContainerBalancer. Add unit tests.

Posted by GitBox <gi...@apache.org>.
siddhantsangwan commented on a change in pull request #2230:
URL: https://github.com/apache/ozone/pull/2230#discussion_r629850890



##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
##########
@@ -59,74 +80,305 @@ public ContainerBalancer(
     this.ozoneConfiguration = ozoneConfiguration;
     this.balancerRunning = false;
     this.config = new ContainerBalancerConfiguration();
+    this.metrics = new ContainerBalancerMetrics();
   }
 
   /**
-   * Start ContainerBalancer. Current implementation is incomplete.
+   * Starts ContainerBalancer. Current implementation is incomplete.
    *
    * @param balancerConfiguration Configuration values.
    */
   public void start(ContainerBalancerConfiguration balancerConfiguration) {
+    if (balancerRunning) {
+      LOG.info("Container Balancer is already running.");
+      throw new RuntimeException();
+    }
     this.balancerRunning = true;
-
     ozoneConfiguration = new OzoneConfiguration();
 
-    // initialise configs
     this.config = balancerConfiguration;
     this.threshold = config.getThreshold();
-    this.maxDatanodesToBalance =
-        config.getMaxDatanodesToBalance();
+    this.maxDatanodesToBalance = config.getMaxDatanodesToBalance();
     this.maxSizeToMove = config.getMaxSizeToMove();
 
+    this.clusterCapacity = 0L;
+    this.clusterUsed = 0L;
+    this.clusterRemaining = 0L;
+
+    this.overUtilizedNodes = new ArrayList<>();
+    this.underUtilizedNodes = new ArrayList<>();
+    this.aboveAverageUtilizedNodes = new ArrayList<>();
+    this.belowAverageUtilizedNodes = new ArrayList<>();
+    this.sourceNodes = new ArrayList<>();
+
     LOG.info("Starting Container Balancer...");
+    LOG.info(toString());
+
+    balance();
+  }
 
-    // sorted list in order from most to least used
-    List<DatanodeUsageInfo> nodes = nodeManager.
-        getMostOrLeastUsedDatanodes(true);
-    double avgUtilisation = calculateAvgUtilisation(nodes);
+  /**
+   * Balances the cluster.
+   */
+  private void balance() {
+    boolean initialized = initializeIteration();
+  }
+
+  /**
+   * Initializes an iteration during balancing. Recognizes over, under,
+   * below-average,and under-average utilizes nodes. Decides whether
+   * balancing needs to continue or should be stopped.
+   */
+  private boolean initializeIteration() {
+    List<DatanodeUsageInfo> nodes;
+    try {
+      // sorted list in order from most to least used
+      nodes = nodeManager.getMostOrLeastUsedDatanodes(true);
+    } catch (NullPointerException e) {
+      LOG.error("Container Balancer could not retrieve nodes from Node " +
+          "Manager.", e);
+      stop();
+      return false;
+    }
+
+    clusterAvgUtilisation = calculateAvgUtilization(nodes);
+    LOG.info("Average utilization of the cluster is {}", clusterAvgUtilisation);
 
     // under utilized nodes have utilization(that is, used / capacity) less
     // than lower limit
-    double lowerLimit = avgUtilisation - threshold;
+    double lowerLimit = clusterAvgUtilisation - threshold;
 
     // over utilized nodes have utilization(that is, used / capacity) greater
     // than upper limit
-    double upperLimit = avgUtilisation + threshold;
+    double upperLimit = clusterAvgUtilisation + threshold;
+
     LOG.info("Lower limit for utilization is {}", lowerLimit);
     LOG.info("Upper limit for utilization is {}", upperLimit);
 
-    // find over utilised(source) and under utilised(target) nodes
-    sourceNodes = new ArrayList<>();
-    targetNodes = new ArrayList<>();
-//    for (DatanodeUsageInfo node : nodes) {
-//      SCMNodeStat stat = node.getScmNodeStat();
-//      double utilization = stat.getScmUsed().get().doubleValue() /
-//          stat.getCapacity().get().doubleValue();
-//      if (utilization > upperLimit) {
-//        sourceNodes.add(node);
-//      } else if (utilization < lowerLimit || utilization < avgUtilisation) {
-//        targetNodes.add(node);
-//      }
-//    }
-  }
-
-  // calculate the average datanode utilisation across the cluster
-  private double calculateAvgUtilisation(List<DatanodeUsageInfo> nodes) {
+    long numDatanodesToBalance = 0L;
+    double overLoadedBytes = 0D, underLoadedBytes = 0D;
+
+    // find over and under utilized nodes
+    for (DatanodeUsageInfo node : nodes) {
+      double utilization = calculateUtilization(node);
+      if (utilization > clusterAvgUtilisation) {
+        if (utilization > upperLimit) {
+          overUtilizedNodes.add(node);
+          numDatanodesToBalance += 1;
+
+          // amount of bytes greater than upper limit in this node
+          overLoadedBytes +=
+              ratioToBytes(node.getScmNodeStat().getCapacity().get(),
+                  utilization) -
+                  ratioToBytes(node.getScmNodeStat().getCapacity().get(),
+                      upperLimit);
+        } else {
+          aboveAverageUtilizedNodes.add(node);
+        }
+      } else if (utilization < clusterAvgUtilisation) {
+        if (utilization < lowerLimit) {
+          underUtilizedNodes.add(node);
+          numDatanodesToBalance += 1;
+
+          // amount of bytes lesser than lower limit in this node
+          underLoadedBytes +=
+              ratioToBytes(node.getScmNodeStat().getCapacity().get(),
+                  lowerLimit) -
+                  ratioToBytes(node.getScmNodeStat().getCapacity().get(),
+                      utilization);
+        } else {
+          belowAverageUtilizedNodes.add(node);
+        }
+      }
+    }
+
+    Collections.reverse(underUtilizedNodes);
+    Collections.reverse(belowAverageUtilizedNodes);
+
+    long numDatanodesBalanced = 0;
+    // count number of nodes that were balanced in previous iteration
+    for (DatanodeUsageInfo node : sourceNodes) {
+      if (!containsNode(overUtilizedNodes, node) &&
+          !containsNode(underUtilizedNodes, node)) {
+        numDatanodesBalanced += 1;
+      }
+    }
+    metrics.setNumDatanodesBalanced(new LongMetric(numDatanodesBalanced));
+    sourceNodes = new ArrayList<>(
+        overUtilizedNodes.size() + underUtilizedNodes.size());
+
+    if (numDatanodesBalanced + numDatanodesToBalance > maxDatanodesToBalance) {
+      LOG.info("Approaching Max Datanodes To Balance limit in Container " +
+          "Balancer. Stopping Balancer.");
+      stop();
+      return false;
+    } else {
+      sourceNodes.addAll(overUtilizedNodes);
+      sourceNodes.addAll(underUtilizedNodes);

Review comment:
       In the case where there are no overUtilizedNodes in the cluster. Only underUtilizedNodes and other nodes having utilization within the limits are present. Then underUtilizedNodes need to be balanced and become the source nodes to which data will be moved. 
   
   So here the term 'source nodes' has been used for nodes that need to be balanced. Target nodes will be chosen from the list of over, above average, under or below average nodes as necessary. Do you have another approach in mind?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] siddhantsangwan commented on pull request #2230: HDDS-4927. Add support for initializing an iteration in ContainerBalancer. Add unit tests.

Posted by GitBox <gi...@apache.org>.
siddhantsangwan commented on pull request #2230:
URL: https://github.com/apache/ozone/pull/2230#issuecomment-846956814


   @JacksonYao287 and @linyiqun can you please review the changes? Any comments are welcome.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] JacksonYao287 commented on a change in pull request #2230: HDDS-4927. Add support for initializing an iteration in ContainerBalancer. Add unit tests.

Posted by GitBox <gi...@apache.org>.
JacksonYao287 commented on a change in pull request #2230:
URL: https://github.com/apache/ozone/pull/2230#discussion_r632926361



##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
##########
@@ -40,96 +44,341 @@
   private ContainerManagerV2 containerManager;
   private ReplicationManager replicationManager;
   private OzoneConfiguration ozoneConfiguration;
+  private final SCMContext scmContext;
   private double threshold;
   private int maxDatanodesToBalance;
   private long maxSizeToMove;
-  private boolean balancerRunning;
-  private List<DatanodeUsageInfo> sourceNodes;
-  private List<DatanodeUsageInfo> targetNodes;
+  private List<DatanodeUsageInfo> unBalancedNodes;
+  private List<DatanodeUsageInfo> overUtilizedNodes;
+  private List<DatanodeUsageInfo> underUtilizedNodes;
+  private List<DatanodeUsageInfo> withinThresholdUtilizedNodes;
   private ContainerBalancerConfiguration config;
+  private ContainerBalancerMetrics metrics;
+  private long clusterCapacity;
+  private long clusterUsed;
+  private long clusterRemaining;
+  private double clusterAvgUtilisation;
+  private final AtomicBoolean balancerRunning = new AtomicBoolean(false);
 
+  /**
+   * Constructs ContainerBalancer with the specified arguments. Initializes
+   * new ContainerBalancerConfiguration and ContainerBalancerMetrics.
+   * Container Balancer does not start on construction.
+   *
+   * @param nodeManager NodeManager
+   * @param containerManager ContainerManager
+   * @param replicationManager ReplicationManager
+   * @param ozoneConfiguration OzoneConfiguration
+   */
   public ContainerBalancer(
       NodeManager nodeManager,
       ContainerManagerV2 containerManager,
       ReplicationManager replicationManager,
-      OzoneConfiguration ozoneConfiguration) {
+      OzoneConfiguration ozoneConfiguration,
+      final SCMContext scmContext) {
     this.nodeManager = nodeManager;
     this.containerManager = containerManager;
     this.replicationManager = replicationManager;
     this.ozoneConfiguration = ozoneConfiguration;
-    this.balancerRunning = false;
     this.config = new ContainerBalancerConfiguration();
+    this.metrics = new ContainerBalancerMetrics();
+    this.scmContext = scmContext;
   }
 
   /**
-   * Start ContainerBalancer. Current implementation is incomplete.
+   * Starts ContainerBalancer. Current implementation is incomplete.
    *
    * @param balancerConfiguration Configuration values.
    */
-  public void start(ContainerBalancerConfiguration balancerConfiguration) {
-    this.balancerRunning = true;
-
+  public boolean start(ContainerBalancerConfiguration balancerConfiguration) {
+    if (!balancerRunning.compareAndSet(false, true)) {
+      LOG.error("Container Balancer is already running.");
+      return false;
+    }
+    if (scmContext.isInSafeMode()) {
+      LOG.error("Container Balancer cannot operate while SCM is in Safe Mode.");
+      return false;
+    }
     ozoneConfiguration = new OzoneConfiguration();
 
-    // initialise configs
     this.config = balancerConfiguration;
     this.threshold = config.getThreshold();
-    this.maxDatanodesToBalance =
-        config.getMaxDatanodesToBalance();
+    this.maxDatanodesToBalance = config.getMaxDatanodesToBalance();
     this.maxSizeToMove = config.getMaxSizeToMove();
 
-    LOG.info("Starting Container Balancer...");
+    this.clusterCapacity = 0L;
+    this.clusterUsed = 0L;
+    this.clusterRemaining = 0L;
+
+    this.overUtilizedNodes = new ArrayList<>();
+    this.underUtilizedNodes = new ArrayList<>();

Review comment:
       maybe it is better to put these initialize operations in constructor, and the start function will just only do the start work




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] linyiqun commented on a change in pull request #2230: HDDS-4927. Add support for initializing an iteration in ContainerBalancer. Add unit tests.

Posted by GitBox <gi...@apache.org>.
linyiqun commented on a change in pull request #2230:
URL: https://github.com/apache/ozone/pull/2230#discussion_r629474188



##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
##########
@@ -59,74 +80,305 @@ public ContainerBalancer(
     this.ozoneConfiguration = ozoneConfiguration;
     this.balancerRunning = false;
     this.config = new ContainerBalancerConfiguration();
+    this.metrics = new ContainerBalancerMetrics();
   }
 
   /**
-   * Start ContainerBalancer. Current implementation is incomplete.
+   * Starts ContainerBalancer. Current implementation is incomplete.
    *
    * @param balancerConfiguration Configuration values.
    */
   public void start(ContainerBalancerConfiguration balancerConfiguration) {
+    if (balancerRunning) {
+      LOG.info("Container Balancer is already running.");
+      throw new RuntimeException();
+    }
     this.balancerRunning = true;
-
     ozoneConfiguration = new OzoneConfiguration();
 
-    // initialise configs
     this.config = balancerConfiguration;
     this.threshold = config.getThreshold();
-    this.maxDatanodesToBalance =
-        config.getMaxDatanodesToBalance();
+    this.maxDatanodesToBalance = config.getMaxDatanodesToBalance();
     this.maxSizeToMove = config.getMaxSizeToMove();
 
+    this.clusterCapacity = 0L;
+    this.clusterUsed = 0L;
+    this.clusterRemaining = 0L;
+
+    this.overUtilizedNodes = new ArrayList<>();
+    this.underUtilizedNodes = new ArrayList<>();
+    this.aboveAverageUtilizedNodes = new ArrayList<>();
+    this.belowAverageUtilizedNodes = new ArrayList<>();
+    this.sourceNodes = new ArrayList<>();
+
     LOG.info("Starting Container Balancer...");
+    LOG.info(toString());
+
+    balance();
+  }
 
-    // sorted list in order from most to least used
-    List<DatanodeUsageInfo> nodes = nodeManager.
-        getMostOrLeastUsedDatanodes(true);
-    double avgUtilisation = calculateAvgUtilisation(nodes);
+  /**
+   * Balances the cluster.
+   */
+  private void balance() {
+    boolean initialized = initializeIteration();

Review comment:
       Before initialize the iteration, can we do the cleanup operation for related node list additionally, like overUtilizedNodes/underUtilizedNodes..  It will look more understandable that we make node list clear logic as part of balance method rather than we clear list out side of this method.

##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
##########
@@ -59,74 +80,305 @@ public ContainerBalancer(
     this.ozoneConfiguration = ozoneConfiguration;
     this.balancerRunning = false;
     this.config = new ContainerBalancerConfiguration();
+    this.metrics = new ContainerBalancerMetrics();
   }
 
   /**
-   * Start ContainerBalancer. Current implementation is incomplete.
+   * Starts ContainerBalancer. Current implementation is incomplete.
    *
    * @param balancerConfiguration Configuration values.
    */
   public void start(ContainerBalancerConfiguration balancerConfiguration) {
+    if (balancerRunning) {
+      LOG.info("Container Balancer is already running.");
+      throw new RuntimeException();
+    }
     this.balancerRunning = true;
-
     ozoneConfiguration = new OzoneConfiguration();
 
-    // initialise configs
     this.config = balancerConfiguration;
     this.threshold = config.getThreshold();
-    this.maxDatanodesToBalance =
-        config.getMaxDatanodesToBalance();
+    this.maxDatanodesToBalance = config.getMaxDatanodesToBalance();
     this.maxSizeToMove = config.getMaxSizeToMove();
 
+    this.clusterCapacity = 0L;
+    this.clusterUsed = 0L;
+    this.clusterRemaining = 0L;
+
+    this.overUtilizedNodes = new ArrayList<>();
+    this.underUtilizedNodes = new ArrayList<>();
+    this.aboveAverageUtilizedNodes = new ArrayList<>();
+    this.belowAverageUtilizedNodes = new ArrayList<>();
+    this.sourceNodes = new ArrayList<>();
+
     LOG.info("Starting Container Balancer...");
+    LOG.info(toString());
+
+    balance();
+  }
 
-    // sorted list in order from most to least used
-    List<DatanodeUsageInfo> nodes = nodeManager.
-        getMostOrLeastUsedDatanodes(true);
-    double avgUtilisation = calculateAvgUtilisation(nodes);
+  /**
+   * Balances the cluster.
+   */
+  private void balance() {
+    boolean initialized = initializeIteration();
+  }
+
+  /**
+   * Initializes an iteration during balancing. Recognizes over, under,
+   * below-average,and under-average utilizes nodes. Decides whether
+   * balancing needs to continue or should be stopped.
+   */
+  private boolean initializeIteration() {
+    List<DatanodeUsageInfo> nodes;
+    try {
+      // sorted list in order from most to least used
+      nodes = nodeManager.getMostOrLeastUsedDatanodes(true);
+    } catch (NullPointerException e) {
+      LOG.error("Container Balancer could not retrieve nodes from Node " +
+          "Manager.", e);
+      stop();
+      return false;
+    }
+
+    clusterAvgUtilisation = calculateAvgUtilization(nodes);
+    LOG.info("Average utilization of the cluster is {}", clusterAvgUtilisation);
 
     // under utilized nodes have utilization(that is, used / capacity) less
     // than lower limit
-    double lowerLimit = avgUtilisation - threshold;
+    double lowerLimit = clusterAvgUtilisation - threshold;
 
     // over utilized nodes have utilization(that is, used / capacity) greater
     // than upper limit
-    double upperLimit = avgUtilisation + threshold;
+    double upperLimit = clusterAvgUtilisation + threshold;
+
     LOG.info("Lower limit for utilization is {}", lowerLimit);
     LOG.info("Upper limit for utilization is {}", upperLimit);
 
-    // find over utilised(source) and under utilised(target) nodes
-    sourceNodes = new ArrayList<>();
-    targetNodes = new ArrayList<>();
-//    for (DatanodeUsageInfo node : nodes) {
-//      SCMNodeStat stat = node.getScmNodeStat();
-//      double utilization = stat.getScmUsed().get().doubleValue() /
-//          stat.getCapacity().get().doubleValue();
-//      if (utilization > upperLimit) {
-//        sourceNodes.add(node);
-//      } else if (utilization < lowerLimit || utilization < avgUtilisation) {
-//        targetNodes.add(node);
-//      }
-//    }
-  }
-
-  // calculate the average datanode utilisation across the cluster
-  private double calculateAvgUtilisation(List<DatanodeUsageInfo> nodes) {
+    long numDatanodesToBalance = 0L;
+    double overLoadedBytes = 0D, underLoadedBytes = 0D;
+
+    // find over and under utilized nodes
+    for (DatanodeUsageInfo node : nodes) {
+      double utilization = calculateUtilization(node);
+      if (utilization > clusterAvgUtilisation) {
+        if (utilization > upperLimit) {
+          overUtilizedNodes.add(node);
+          numDatanodesToBalance += 1;
+
+          // amount of bytes greater than upper limit in this node
+          overLoadedBytes +=
+              ratioToBytes(node.getScmNodeStat().getCapacity().get(),
+                  utilization) -
+                  ratioToBytes(node.getScmNodeStat().getCapacity().get(),
+                      upperLimit);
+        } else {
+          aboveAverageUtilizedNodes.add(node);
+        }
+      } else if (utilization < clusterAvgUtilisation) {
+        if (utilization < lowerLimit) {
+          underUtilizedNodes.add(node);
+          numDatanodesToBalance += 1;
+
+          // amount of bytes lesser than lower limit in this node
+          underLoadedBytes +=
+              ratioToBytes(node.getScmNodeStat().getCapacity().get(),
+                  lowerLimit) -
+                  ratioToBytes(node.getScmNodeStat().getCapacity().get(),
+                      utilization);
+        } else {
+          belowAverageUtilizedNodes.add(node);
+        }
+      }
+    }
+
+    Collections.reverse(underUtilizedNodes);
+    Collections.reverse(belowAverageUtilizedNodes);
+
+    long numDatanodesBalanced = 0;
+    // count number of nodes that were balanced in previous iteration
+    for (DatanodeUsageInfo node : sourceNodes) {
+      if (!containsNode(overUtilizedNodes, node) &&
+          !containsNode(underUtilizedNodes, node)) {
+        numDatanodesBalanced += 1;
+      }
+    }
+    metrics.setNumDatanodesBalanced(new LongMetric(numDatanodesBalanced));
+    sourceNodes = new ArrayList<>(
+        overUtilizedNodes.size() + underUtilizedNodes.size());
+
+    if (numDatanodesBalanced + numDatanodesToBalance > maxDatanodesToBalance) {
+      LOG.info("Approaching Max Datanodes To Balance limit in Container " +
+          "Balancer. Stopping Balancer.");
+      stop();
+      return false;
+    } else {
+      sourceNodes.addAll(overUtilizedNodes);
+      sourceNodes.addAll(underUtilizedNodes);

Review comment:
       Not fully get this. Why underUtilizedNodes are added into source list, not target nodes like original logic did?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] GlenGeng commented on a change in pull request #2230: HDDS-4927. Add support for initializing an iteration in ContainerBalancer. Add unit tests.

Posted by GitBox <gi...@apache.org>.
GlenGeng commented on a change in pull request #2230:
URL: https://github.com/apache/ozone/pull/2230#discussion_r630705685



##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
##########
@@ -59,74 +80,321 @@ public ContainerBalancer(
     this.ozoneConfiguration = ozoneConfiguration;
     this.balancerRunning = false;
     this.config = new ContainerBalancerConfiguration();
+    this.metrics = new ContainerBalancerMetrics();
   }
 
   /**
-   * Start ContainerBalancer. Current implementation is incomplete.
+   * Starts ContainerBalancer. Current implementation is incomplete.
    *
    * @param balancerConfiguration Configuration values.
    */
   public void start(ContainerBalancerConfiguration balancerConfiguration) {
+    if (balancerRunning) {
+      LOG.info("Container Balancer is already running.");
+      throw new RuntimeException();
+    }
     this.balancerRunning = true;
-
     ozoneConfiguration = new OzoneConfiguration();
 
-    // initialise configs
     this.config = balancerConfiguration;
     this.threshold = config.getThreshold();
-    this.maxDatanodesToBalance =
-        config.getMaxDatanodesToBalance();
+    this.maxDatanodesToBalance = config.getMaxDatanodesToBalance();
     this.maxSizeToMove = config.getMaxSizeToMove();
 
+    this.clusterCapacity = 0L;
+    this.clusterUsed = 0L;
+    this.clusterRemaining = 0L;
+
+    this.overUtilizedNodes = new ArrayList<>();
+    this.underUtilizedNodes = new ArrayList<>();
+    this.aboveAverageUtilizedNodes = new ArrayList<>();
+    this.belowAverageUtilizedNodes = new ArrayList<>();
+    this.sourceNodes = new ArrayList<>();
+
     LOG.info("Starting Container Balancer...");
+    LOG.info(toString());
+
+    balance();
+  }
 
-    // sorted list in order from most to least used
-    List<DatanodeUsageInfo> nodes = nodeManager.
-        getMostOrLeastUsedDatanodes(true);
-    double avgUtilisation = calculateAvgUtilisation(nodes);
+  /**
+   * Balances the cluster.
+   */
+  private void balance() {
+    overUtilizedNodes.clear();
+    underUtilizedNodes.clear();
+    aboveAverageUtilizedNodes.clear();
+    belowAverageUtilizedNodes.clear();
+    initializeIteration();
+  }
+
+  /**
+   * Initializes an iteration during balancing. Recognizes over, under,
+   * below-average,and under-average utilizes nodes. Decides whether
+   * balancing needs to continue or should be stopped.
+   *
+   * @return true if successfully initialized, otherwise false.
+   */
+  private boolean initializeIteration() {
+    List<DatanodeUsageInfo> nodes;
+    try {
+      // sorted list in order from most to least used
+      nodes = nodeManager.getMostOrLeastUsedDatanodes(true);
+    } catch (NullPointerException e) {
+      LOG.error("Container Balancer could not retrieve nodes from Node " +
+          "Manager.", e);
+      stop();
+      return false;
+    }
+
+    try {
+      clusterAvgUtilisation = calculateAvgUtilization(nodes);
+    } catch(ArithmeticException e) {
+      LOG.warn("Container Balancer failed to initialize an iteration", e);
+      return false;
+    }
+    LOG.info("Average utilization of the cluster is {}", clusterAvgUtilisation);
 
     // under utilized nodes have utilization(that is, used / capacity) less
     // than lower limit
-    double lowerLimit = avgUtilisation - threshold;
+    double lowerLimit = clusterAvgUtilisation - threshold;
 
     // over utilized nodes have utilization(that is, used / capacity) greater
     // than upper limit
-    double upperLimit = avgUtilisation + threshold;
+    double upperLimit = clusterAvgUtilisation + threshold;
+

Review comment:
       Question:
   
   say we have a 10 DN cluster, the usage of all of them is 95%, then one empty DN is added to rebalance the cluster. Given the threshold is 10%, it seems the balancer will not work in this case, since that 10 DN will not achieve upperLimit.
   
   Have we consider corner case like this ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] lokeshj1703 commented on a change in pull request #2230: HDDS-4927. Add support for initializing an iteration in ContainerBalancer. Add unit tests.

Posted by GitBox <gi...@apache.org>.
lokeshj1703 commented on a change in pull request #2230:
URL: https://github.com/apache/ozone/pull/2230#discussion_r639555281



##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
##########
@@ -40,96 +44,347 @@
   private ContainerManagerV2 containerManager;
   private ReplicationManager replicationManager;
   private OzoneConfiguration ozoneConfiguration;
+  private final SCMContext scmContext;
   private double threshold;
   private int maxDatanodesToBalance;
   private long maxSizeToMove;
-  private boolean balancerRunning;
-  private List<DatanodeUsageInfo> sourceNodes;
-  private List<DatanodeUsageInfo> targetNodes;
+  private List<DatanodeUsageInfo> unBalancedNodes;
+  private List<DatanodeUsageInfo> overUtilizedNodes;
+  private List<DatanodeUsageInfo> underUtilizedNodes;
+  private List<DatanodeUsageInfo> withinThresholdUtilizedNodes;
   private ContainerBalancerConfiguration config;
+  private ContainerBalancerMetrics metrics;
+  private long clusterCapacity;
+  private long clusterUsed;
+  private long clusterRemaining;
+  private double clusterAvgUtilisation;
+  private final AtomicBoolean balancerRunning = new AtomicBoolean(false);
 
+  /**
+   * Constructs ContainerBalancer with the specified arguments. Initializes
+   * new ContainerBalancerConfiguration and ContainerBalancerMetrics.
+   * Container Balancer does not start on construction.
+   *
+   * @param nodeManager NodeManager
+   * @param containerManager ContainerManager
+   * @param replicationManager ReplicationManager
+   * @param ozoneConfiguration OzoneConfiguration
+   */
   public ContainerBalancer(
       NodeManager nodeManager,
       ContainerManagerV2 containerManager,
       ReplicationManager replicationManager,
-      OzoneConfiguration ozoneConfiguration) {
+      OzoneConfiguration ozoneConfiguration,
+      final SCMContext scmContext) {
     this.nodeManager = nodeManager;
     this.containerManager = containerManager;
     this.replicationManager = replicationManager;
     this.ozoneConfiguration = ozoneConfiguration;
-    this.balancerRunning = false;
     this.config = new ContainerBalancerConfiguration();
+    this.metrics = new ContainerBalancerMetrics();
+    this.scmContext = scmContext;
+
+    this.clusterCapacity = 0L;
+    this.clusterUsed = 0L;
+    this.clusterRemaining = 0L;
+
+    this.overUtilizedNodes = new ArrayList<>();
+    this.underUtilizedNodes = new ArrayList<>();
+    this.unBalancedNodes = new ArrayList<>();
+    this.withinThresholdUtilizedNodes = new ArrayList<>();
   }
 
   /**
-   * Start ContainerBalancer. Current implementation is incomplete.
+   * Starts ContainerBalancer. Current implementation is incomplete.
    *
    * @param balancerConfiguration Configuration values.
    */
-  public void start(ContainerBalancerConfiguration balancerConfiguration) {
-    this.balancerRunning = true;
+  public boolean start(ContainerBalancerConfiguration balancerConfiguration) {

Review comment:
       I see. We will not need the configuration param in that case. I see that change is made in #2278. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] siddhantsangwan commented on pull request #2230: HDDS-4927. Add support for initializing an iteration in ContainerBalancer. Add unit tests.

Posted by GitBox <gi...@apache.org>.
siddhantsangwan commented on pull request #2230:
URL: https://github.com/apache/ozone/pull/2230#issuecomment-839668262


   Changed the term `source` nodes to `unBalanced` nodes to avoid ambiguity. Source node can mean 'node from which data is leaving' as expected. @linyiqun 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] siddhantsangwan commented on a change in pull request #2230: HDDS-4927. Add support for initializing an iteration in ContainerBalancer. Add unit tests.

Posted by GitBox <gi...@apache.org>.
siddhantsangwan commented on a change in pull request #2230:
URL: https://github.com/apache/ozone/pull/2230#discussion_r631637758



##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
##########
@@ -59,74 +80,321 @@ public ContainerBalancer(
     this.ozoneConfiguration = ozoneConfiguration;
     this.balancerRunning = false;
     this.config = new ContainerBalancerConfiguration();
+    this.metrics = new ContainerBalancerMetrics();
   }
 
   /**
-   * Start ContainerBalancer. Current implementation is incomplete.
+   * Starts ContainerBalancer. Current implementation is incomplete.
    *
    * @param balancerConfiguration Configuration values.
    */
   public void start(ContainerBalancerConfiguration balancerConfiguration) {
+    if (balancerRunning) {
+      LOG.info("Container Balancer is already running.");
+      throw new RuntimeException();
+    }
     this.balancerRunning = true;
-
     ozoneConfiguration = new OzoneConfiguration();
 
-    // initialise configs
     this.config = balancerConfiguration;
     this.threshold = config.getThreshold();
-    this.maxDatanodesToBalance =
-        config.getMaxDatanodesToBalance();
+    this.maxDatanodesToBalance = config.getMaxDatanodesToBalance();
     this.maxSizeToMove = config.getMaxSizeToMove();
 
+    this.clusterCapacity = 0L;
+    this.clusterUsed = 0L;
+    this.clusterRemaining = 0L;
+
+    this.overUtilizedNodes = new ArrayList<>();
+    this.underUtilizedNodes = new ArrayList<>();
+    this.aboveAverageUtilizedNodes = new ArrayList<>();
+    this.belowAverageUtilizedNodes = new ArrayList<>();
+    this.sourceNodes = new ArrayList<>();
+
     LOG.info("Starting Container Balancer...");
+    LOG.info(toString());
+
+    balance();
+  }
 
-    // sorted list in order from most to least used
-    List<DatanodeUsageInfo> nodes = nodeManager.
-        getMostOrLeastUsedDatanodes(true);
-    double avgUtilisation = calculateAvgUtilisation(nodes);
+  /**
+   * Balances the cluster.
+   */
+  private void balance() {
+    overUtilizedNodes.clear();
+    underUtilizedNodes.clear();
+    aboveAverageUtilizedNodes.clear();
+    belowAverageUtilizedNodes.clear();
+    initializeIteration();
+  }
+
+  /**
+   * Initializes an iteration during balancing. Recognizes over, under,
+   * below-average,and under-average utilizes nodes. Decides whether
+   * balancing needs to continue or should be stopped.
+   *
+   * @return true if successfully initialized, otherwise false.
+   */
+  private boolean initializeIteration() {

Review comment:
       Using `calculateAvgUtilization`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] JacksonYao287 commented on a change in pull request #2230: HDDS-4927. Add support for initializing an iteration in ContainerBalancer. Add unit tests.

Posted by GitBox <gi...@apache.org>.
JacksonYao287 commented on a change in pull request #2230:
URL: https://github.com/apache/ozone/pull/2230#discussion_r632926361



##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
##########
@@ -40,96 +44,341 @@
   private ContainerManagerV2 containerManager;
   private ReplicationManager replicationManager;
   private OzoneConfiguration ozoneConfiguration;
+  private final SCMContext scmContext;
   private double threshold;
   private int maxDatanodesToBalance;
   private long maxSizeToMove;
-  private boolean balancerRunning;
-  private List<DatanodeUsageInfo> sourceNodes;
-  private List<DatanodeUsageInfo> targetNodes;
+  private List<DatanodeUsageInfo> unBalancedNodes;
+  private List<DatanodeUsageInfo> overUtilizedNodes;
+  private List<DatanodeUsageInfo> underUtilizedNodes;
+  private List<DatanodeUsageInfo> withinThresholdUtilizedNodes;
   private ContainerBalancerConfiguration config;
+  private ContainerBalancerMetrics metrics;
+  private long clusterCapacity;
+  private long clusterUsed;
+  private long clusterRemaining;
+  private double clusterAvgUtilisation;
+  private final AtomicBoolean balancerRunning = new AtomicBoolean(false);
 
+  /**
+   * Constructs ContainerBalancer with the specified arguments. Initializes
+   * new ContainerBalancerConfiguration and ContainerBalancerMetrics.
+   * Container Balancer does not start on construction.
+   *
+   * @param nodeManager NodeManager
+   * @param containerManager ContainerManager
+   * @param replicationManager ReplicationManager
+   * @param ozoneConfiguration OzoneConfiguration
+   */
   public ContainerBalancer(
       NodeManager nodeManager,
       ContainerManagerV2 containerManager,
       ReplicationManager replicationManager,
-      OzoneConfiguration ozoneConfiguration) {
+      OzoneConfiguration ozoneConfiguration,
+      final SCMContext scmContext) {
     this.nodeManager = nodeManager;
     this.containerManager = containerManager;
     this.replicationManager = replicationManager;
     this.ozoneConfiguration = ozoneConfiguration;
-    this.balancerRunning = false;
     this.config = new ContainerBalancerConfiguration();
+    this.metrics = new ContainerBalancerMetrics();
+    this.scmContext = scmContext;
   }
 
   /**
-   * Start ContainerBalancer. Current implementation is incomplete.
+   * Starts ContainerBalancer. Current implementation is incomplete.
    *
    * @param balancerConfiguration Configuration values.
    */
-  public void start(ContainerBalancerConfiguration balancerConfiguration) {
-    this.balancerRunning = true;
-
+  public boolean start(ContainerBalancerConfiguration balancerConfiguration) {
+    if (!balancerRunning.compareAndSet(false, true)) {
+      LOG.error("Container Balancer is already running.");
+      return false;
+    }
+    if (scmContext.isInSafeMode()) {
+      LOG.error("Container Balancer cannot operate while SCM is in Safe Mode.");
+      return false;
+    }
     ozoneConfiguration = new OzoneConfiguration();
 
-    // initialise configs
     this.config = balancerConfiguration;
     this.threshold = config.getThreshold();
-    this.maxDatanodesToBalance =
-        config.getMaxDatanodesToBalance();
+    this.maxDatanodesToBalance = config.getMaxDatanodesToBalance();
     this.maxSizeToMove = config.getMaxSizeToMove();
 
-    LOG.info("Starting Container Balancer...");
+    this.clusterCapacity = 0L;
+    this.clusterUsed = 0L;
+    this.clusterRemaining = 0L;
+
+    this.overUtilizedNodes = new ArrayList<>();
+    this.underUtilizedNodes = new ArrayList<>();

Review comment:
       maybe it is better to put these initialize operations in constructor, and the start function will just do the start work




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] siddhantsangwan commented on a change in pull request #2230: HDDS-4927. Add support for initializing an iteration in ContainerBalancer. Add unit tests.

Posted by GitBox <gi...@apache.org>.
siddhantsangwan commented on a change in pull request #2230:
URL: https://github.com/apache/ozone/pull/2230#discussion_r629850890



##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
##########
@@ -59,74 +80,305 @@ public ContainerBalancer(
     this.ozoneConfiguration = ozoneConfiguration;
     this.balancerRunning = false;
     this.config = new ContainerBalancerConfiguration();
+    this.metrics = new ContainerBalancerMetrics();
   }
 
   /**
-   * Start ContainerBalancer. Current implementation is incomplete.
+   * Starts ContainerBalancer. Current implementation is incomplete.
    *
    * @param balancerConfiguration Configuration values.
    */
   public void start(ContainerBalancerConfiguration balancerConfiguration) {
+    if (balancerRunning) {
+      LOG.info("Container Balancer is already running.");
+      throw new RuntimeException();
+    }
     this.balancerRunning = true;
-
     ozoneConfiguration = new OzoneConfiguration();
 
-    // initialise configs
     this.config = balancerConfiguration;
     this.threshold = config.getThreshold();
-    this.maxDatanodesToBalance =
-        config.getMaxDatanodesToBalance();
+    this.maxDatanodesToBalance = config.getMaxDatanodesToBalance();
     this.maxSizeToMove = config.getMaxSizeToMove();
 
+    this.clusterCapacity = 0L;
+    this.clusterUsed = 0L;
+    this.clusterRemaining = 0L;
+
+    this.overUtilizedNodes = new ArrayList<>();
+    this.underUtilizedNodes = new ArrayList<>();
+    this.aboveAverageUtilizedNodes = new ArrayList<>();
+    this.belowAverageUtilizedNodes = new ArrayList<>();
+    this.sourceNodes = new ArrayList<>();
+
     LOG.info("Starting Container Balancer...");
+    LOG.info(toString());
+
+    balance();
+  }
 
-    // sorted list in order from most to least used
-    List<DatanodeUsageInfo> nodes = nodeManager.
-        getMostOrLeastUsedDatanodes(true);
-    double avgUtilisation = calculateAvgUtilisation(nodes);
+  /**
+   * Balances the cluster.
+   */
+  private void balance() {
+    boolean initialized = initializeIteration();
+  }
+
+  /**
+   * Initializes an iteration during balancing. Recognizes over, under,
+   * below-average,and under-average utilizes nodes. Decides whether
+   * balancing needs to continue or should be stopped.
+   */
+  private boolean initializeIteration() {
+    List<DatanodeUsageInfo> nodes;
+    try {
+      // sorted list in order from most to least used
+      nodes = nodeManager.getMostOrLeastUsedDatanodes(true);
+    } catch (NullPointerException e) {
+      LOG.error("Container Balancer could not retrieve nodes from Node " +
+          "Manager.", e);
+      stop();
+      return false;
+    }
+
+    clusterAvgUtilisation = calculateAvgUtilization(nodes);
+    LOG.info("Average utilization of the cluster is {}", clusterAvgUtilisation);
 
     // under utilized nodes have utilization(that is, used / capacity) less
     // than lower limit
-    double lowerLimit = avgUtilisation - threshold;
+    double lowerLimit = clusterAvgUtilisation - threshold;
 
     // over utilized nodes have utilization(that is, used / capacity) greater
     // than upper limit
-    double upperLimit = avgUtilisation + threshold;
+    double upperLimit = clusterAvgUtilisation + threshold;
+
     LOG.info("Lower limit for utilization is {}", lowerLimit);
     LOG.info("Upper limit for utilization is {}", upperLimit);
 
-    // find over utilised(source) and under utilised(target) nodes
-    sourceNodes = new ArrayList<>();
-    targetNodes = new ArrayList<>();
-//    for (DatanodeUsageInfo node : nodes) {
-//      SCMNodeStat stat = node.getScmNodeStat();
-//      double utilization = stat.getScmUsed().get().doubleValue() /
-//          stat.getCapacity().get().doubleValue();
-//      if (utilization > upperLimit) {
-//        sourceNodes.add(node);
-//      } else if (utilization < lowerLimit || utilization < avgUtilisation) {
-//        targetNodes.add(node);
-//      }
-//    }
-  }
-
-  // calculate the average datanode utilisation across the cluster
-  private double calculateAvgUtilisation(List<DatanodeUsageInfo> nodes) {
+    long numDatanodesToBalance = 0L;
+    double overLoadedBytes = 0D, underLoadedBytes = 0D;
+
+    // find over and under utilized nodes
+    for (DatanodeUsageInfo node : nodes) {
+      double utilization = calculateUtilization(node);
+      if (utilization > clusterAvgUtilisation) {
+        if (utilization > upperLimit) {
+          overUtilizedNodes.add(node);
+          numDatanodesToBalance += 1;
+
+          // amount of bytes greater than upper limit in this node
+          overLoadedBytes +=
+              ratioToBytes(node.getScmNodeStat().getCapacity().get(),
+                  utilization) -
+                  ratioToBytes(node.getScmNodeStat().getCapacity().get(),
+                      upperLimit);
+        } else {
+          aboveAverageUtilizedNodes.add(node);
+        }
+      } else if (utilization < clusterAvgUtilisation) {
+        if (utilization < lowerLimit) {
+          underUtilizedNodes.add(node);
+          numDatanodesToBalance += 1;
+
+          // amount of bytes lesser than lower limit in this node
+          underLoadedBytes +=
+              ratioToBytes(node.getScmNodeStat().getCapacity().get(),
+                  lowerLimit) -
+                  ratioToBytes(node.getScmNodeStat().getCapacity().get(),
+                      utilization);
+        } else {
+          belowAverageUtilizedNodes.add(node);
+        }
+      }
+    }
+
+    Collections.reverse(underUtilizedNodes);
+    Collections.reverse(belowAverageUtilizedNodes);
+
+    long numDatanodesBalanced = 0;
+    // count number of nodes that were balanced in previous iteration
+    for (DatanodeUsageInfo node : sourceNodes) {
+      if (!containsNode(overUtilizedNodes, node) &&
+          !containsNode(underUtilizedNodes, node)) {
+        numDatanodesBalanced += 1;
+      }
+    }
+    metrics.setNumDatanodesBalanced(new LongMetric(numDatanodesBalanced));
+    sourceNodes = new ArrayList<>(
+        overUtilizedNodes.size() + underUtilizedNodes.size());
+
+    if (numDatanodesBalanced + numDatanodesToBalance > maxDatanodesToBalance) {
+      LOG.info("Approaching Max Datanodes To Balance limit in Container " +
+          "Balancer. Stopping Balancer.");
+      stop();
+      return false;
+    } else {
+      sourceNodes.addAll(overUtilizedNodes);
+      sourceNodes.addAll(underUtilizedNodes);

Review comment:
       In the case where there are no overUtilizedNodes in the cluster. Only underUtilizedNodes and other nodes having utilization within the limits are present. Then underUtilizedNodes need to be balanced and become the source nodes to which data will be moved. 
   
   So here the term 'source nodes' has been used for nodes that need to be balanced. Target nodes will be chosen from the list of over utilized, above average, under utilized or below average nodes as necessary. Do you have another approach in mind?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] siddhantsangwan commented on a change in pull request #2230: HDDS-4927. Add support for initializing an iteration in ContainerBalancer. Add unit tests.

Posted by GitBox <gi...@apache.org>.
siddhantsangwan commented on a change in pull request #2230:
URL: https://github.com/apache/ozone/pull/2230#discussion_r630086048



##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
##########
@@ -59,74 +80,321 @@ public ContainerBalancer(
     this.ozoneConfiguration = ozoneConfiguration;
     this.balancerRunning = false;
     this.config = new ContainerBalancerConfiguration();
+    this.metrics = new ContainerBalancerMetrics();
   }
 
   /**
-   * Start ContainerBalancer. Current implementation is incomplete.
+   * Starts ContainerBalancer. Current implementation is incomplete.
    *
    * @param balancerConfiguration Configuration values.
    */
   public void start(ContainerBalancerConfiguration balancerConfiguration) {
+    if (balancerRunning) {
+      LOG.info("Container Balancer is already running.");
+      throw new RuntimeException();
+    }
     this.balancerRunning = true;
-
     ozoneConfiguration = new OzoneConfiguration();
 
-    // initialise configs
     this.config = balancerConfiguration;
     this.threshold = config.getThreshold();
-    this.maxDatanodesToBalance =
-        config.getMaxDatanodesToBalance();
+    this.maxDatanodesToBalance = config.getMaxDatanodesToBalance();
     this.maxSizeToMove = config.getMaxSizeToMove();
 
+    this.clusterCapacity = 0L;
+    this.clusterUsed = 0L;
+    this.clusterRemaining = 0L;
+
+    this.overUtilizedNodes = new ArrayList<>();
+    this.underUtilizedNodes = new ArrayList<>();
+    this.aboveAverageUtilizedNodes = new ArrayList<>();
+    this.belowAverageUtilizedNodes = new ArrayList<>();
+    this.sourceNodes = new ArrayList<>();
+
     LOG.info("Starting Container Balancer...");
+    LOG.info(toString());
+
+    balance();
+  }
 
-    // sorted list in order from most to least used
-    List<DatanodeUsageInfo> nodes = nodeManager.
-        getMostOrLeastUsedDatanodes(true);
-    double avgUtilisation = calculateAvgUtilisation(nodes);
+  /**
+   * Balances the cluster.
+   */
+  private void balance() {
+    overUtilizedNodes.clear();
+    underUtilizedNodes.clear();
+    aboveAverageUtilizedNodes.clear();
+    belowAverageUtilizedNodes.clear();
+    initializeIteration();
+  }
+
+  /**
+   * Initializes an iteration during balancing. Recognizes over, under,
+   * below-average,and under-average utilizes nodes. Decides whether
+   * balancing needs to continue or should be stopped.
+   *
+   * @return true if successfully initialized, otherwise false.
+   */
+  private boolean initializeIteration() {
+    List<DatanodeUsageInfo> nodes;
+    try {
+      // sorted list in order from most to least used
+      nodes = nodeManager.getMostOrLeastUsedDatanodes(true);
+    } catch (NullPointerException e) {
+      LOG.error("Container Balancer could not retrieve nodes from Node " +
+          "Manager.", e);
+      stop();
+      return false;
+    }
+
+    try {
+      clusterAvgUtilisation = calculateAvgUtilization(nodes);
+    } catch(ArithmeticException e) {
+      LOG.warn("Container Balancer failed to initialize an iteration", e);
+      return false;
+    }
+    LOG.info("Average utilization of the cluster is {}", clusterAvgUtilisation);
 
     // under utilized nodes have utilization(that is, used / capacity) less
     // than lower limit
-    double lowerLimit = avgUtilisation - threshold;
+    double lowerLimit = clusterAvgUtilisation - threshold;
 
     // over utilized nodes have utilization(that is, used / capacity) greater
     // than upper limit
-    double upperLimit = avgUtilisation + threshold;
+    double upperLimit = clusterAvgUtilisation + threshold;
+

Review comment:
       A negative lower limit and upper limit greater than 1 will not lead to errors(as checked by the unit test). But in that case we could return false early since the cluster is balanced.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] siddhantsangwan commented on a change in pull request #2230: HDDS-4927. Add support for initializing an iteration in ContainerBalancer. Add unit tests.

Posted by GitBox <gi...@apache.org>.
siddhantsangwan commented on a change in pull request #2230:
URL: https://github.com/apache/ozone/pull/2230#discussion_r638690164



##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
##########
@@ -40,96 +44,345 @@
   private ContainerManagerV2 containerManager;
   private ReplicationManager replicationManager;
   private OzoneConfiguration ozoneConfiguration;
+  private final SCMContext scmContext;
   private double threshold;
   private int maxDatanodesToBalance;
   private long maxSizeToMove;
-  private boolean balancerRunning;
-  private List<DatanodeUsageInfo> sourceNodes;
-  private List<DatanodeUsageInfo> targetNodes;
+  private List<DatanodeUsageInfo> unBalancedNodes;
+  private List<DatanodeUsageInfo> overUtilizedNodes;
+  private List<DatanodeUsageInfo> underUtilizedNodes;
+  private List<DatanodeUsageInfo> withinThresholdUtilizedNodes;
   private ContainerBalancerConfiguration config;
+  private ContainerBalancerMetrics metrics;
+  private long clusterCapacity;
+  private long clusterUsed;
+  private long clusterRemaining;
+  private double clusterAvgUtilisation;
+  private final AtomicBoolean balancerRunning = new AtomicBoolean(false);
 
+  /**
+   * Constructs ContainerBalancer with the specified arguments. Initializes
+   * new ContainerBalancerConfiguration and ContainerBalancerMetrics.
+   * Container Balancer does not start on construction.
+   *
+   * @param nodeManager NodeManager
+   * @param containerManager ContainerManager
+   * @param replicationManager ReplicationManager
+   * @param ozoneConfiguration OzoneConfiguration
+   */
   public ContainerBalancer(
       NodeManager nodeManager,
       ContainerManagerV2 containerManager,
       ReplicationManager replicationManager,
-      OzoneConfiguration ozoneConfiguration) {
+      OzoneConfiguration ozoneConfiguration,
+      final SCMContext scmContext) {
     this.nodeManager = nodeManager;
     this.containerManager = containerManager;
     this.replicationManager = replicationManager;
     this.ozoneConfiguration = ozoneConfiguration;
-    this.balancerRunning = false;
     this.config = new ContainerBalancerConfiguration();
+    this.metrics = new ContainerBalancerMetrics();
+    this.scmContext = scmContext;
+
+    this.clusterCapacity = 0L;
+    this.clusterUsed = 0L;
+    this.clusterRemaining = 0L;
+
+    this.overUtilizedNodes = new ArrayList<>();
+    this.underUtilizedNodes = new ArrayList<>();
+    this.unBalancedNodes = new ArrayList<>();
+    this.withinThresholdUtilizedNodes = new ArrayList<>();
   }
 
   /**
-   * Start ContainerBalancer. Current implementation is incomplete.
+   * Starts ContainerBalancer. Current implementation is incomplete.
    *
    * @param balancerConfiguration Configuration values.
    */
-  public void start(ContainerBalancerConfiguration balancerConfiguration) {
-    this.balancerRunning = true;
+  public boolean start(ContainerBalancerConfiguration balancerConfiguration) {
+    if (!balancerRunning.compareAndSet(false, true)) {
+      LOG.error("Container Balancer is already running.");
+      return false;
+    }
 
     ozoneConfiguration = new OzoneConfiguration();
-
-    // initialise configs
     this.config = balancerConfiguration;
     this.threshold = config.getThreshold();
-    this.maxDatanodesToBalance =
-        config.getMaxDatanodesToBalance();
+    this.maxDatanodesToBalance = config.getMaxDatanodesToBalance();
     this.maxSizeToMove = config.getMaxSizeToMove();
+    this.unBalancedNodes = new ArrayList<>();
+
+    LOG.info("Starting Container Balancer...{}", this);
+    balance();
+    return true;
+  }
 
-    LOG.info("Starting Container Balancer...");
+  /**
+   * Balances the cluster.
+   */
+  private void balance() {
+    initializeIteration();
 
+    // unBalancedNodes is not cleared since the next iteration uses this
+    // iteration's unBalancedNodes to find out how many nodes were balanced
+    overUtilizedNodes.clear();
+    underUtilizedNodes.clear();
+    withinThresholdUtilizedNodes.clear();
+  }
+
+  /**
+   * Initializes an iteration during balancing. Recognizes over, under, and
+   * within threshold utilized nodes. Decides whether balancing needs to
+   * continue or should be stopped.
+   *
+   * @return true if successfully initialized, otherwise false.
+   */
+  private boolean initializeIteration() {
+    if (scmContext.isInSafeMode()) {
+      LOG.error("Container Balancer cannot operate while SCM is in Safe Mode.");
+      return false;
+    }
     // sorted list in order from most to least used
-    List<DatanodeUsageInfo> nodes = nodeManager.
-        getMostOrLeastUsedDatanodes(true);
-    double avgUtilisation = calculateAvgUtilisation(nodes);
+    List<DatanodeUsageInfo> datanodeUsageInfos =
+        nodeManager.getMostOrLeastUsedDatanodes(true);
+    if (datanodeUsageInfos.isEmpty()) {
+      LOG.info("Container Balancer could not retrieve nodes from Node " +
+          "Manager.");
+      stop();
+      return false;
+    }
+
+    clusterAvgUtilisation = calculateAvgUtilization(datanodeUsageInfos);
+    LOG.info("Average utilization of the cluster is {}", clusterAvgUtilisation);
 
     // under utilized nodes have utilization(that is, used / capacity) less
     // than lower limit
-    double lowerLimit = avgUtilisation - threshold;
+    double lowerLimit = clusterAvgUtilisation - threshold;
 
     // over utilized nodes have utilization(that is, used / capacity) greater
     // than upper limit
-    double upperLimit = avgUtilisation + threshold;
-    LOG.info("Lower limit for utilization is {}", lowerLimit);
-    LOG.info("Upper limit for utilization is {}", upperLimit);
-
-    // find over utilised(source) and under utilised(target) nodes
-    sourceNodes = new ArrayList<>();
-    targetNodes = new ArrayList<>();
-//    for (DatanodeUsageInfo node : nodes) {
-//      SCMNodeStat stat = node.getScmNodeStat();
-//      double utilization = stat.getScmUsed().get().doubleValue() /
-//          stat.getCapacity().get().doubleValue();
-//      if (utilization > upperLimit) {
-//        sourceNodes.add(node);
-//      } else if (utilization < lowerLimit || utilization < avgUtilisation) {
-//        targetNodes.add(node);
-//      }
-//    }
-  }
-
-  // calculate the average datanode utilisation across the cluster
-  private double calculateAvgUtilisation(List<DatanodeUsageInfo> nodes) {
+    double upperLimit = clusterAvgUtilisation + threshold;
+
+    LOG.info("Lower limit for utilization is {} and Upper limit for " +
+        "utilization is {}", lowerLimit, upperLimit);
+
+    long numDatanodesToBalance = 0L;
+    double overLoadedBytes = 0D, underLoadedBytes = 0D;
+
+    // find over and under utilized nodes
+    for (DatanodeUsageInfo datanodeUsageInfo : datanodeUsageInfos) {
+      double utilization = calculateUtilization(datanodeUsageInfo);
+      if (utilization > upperLimit) {
+        overUtilizedNodes.add(datanodeUsageInfo);
+        numDatanodesToBalance += 1;
+
+        // amount of bytes greater than upper limit in this node
+        overLoadedBytes += ratioToBytes(
+            datanodeUsageInfo.getScmNodeStat().getCapacity().get(),
+            utilization) - ratioToBytes(
+            datanodeUsageInfo.getScmNodeStat().getCapacity().get(),
+            upperLimit);
+      } else if (utilization < lowerLimit) {
+        underUtilizedNodes.add(datanodeUsageInfo);
+        numDatanodesToBalance += 1;
+
+        // amount of bytes lesser than lower limit in this node
+        underLoadedBytes += ratioToBytes(
+            datanodeUsageInfo.getScmNodeStat().getCapacity().get(),
+            lowerLimit) - ratioToBytes(
+            datanodeUsageInfo.getScmNodeStat().getCapacity().get(),
+            utilization);
+      } else {
+        withinThresholdUtilizedNodes.add(datanodeUsageInfo);
+      }
+    }
+    Collections.reverse(underUtilizedNodes);
+
+    long numDatanodesBalanced = 0;
+    // count number of nodes that were balanced in previous iteration
+    for (DatanodeUsageInfo node : unBalancedNodes) {
+      if (!containsNode(overUtilizedNodes, node) &&
+          !containsNode(underUtilizedNodes, node)) {
+        numDatanodesBalanced += 1;
+      }
+    }
+    // calculate total number of nodes that have been balanced so far
+    numDatanodesBalanced =
+        metrics.incrementNumDatanodesBalanced(numDatanodesBalanced);
+
+    unBalancedNodes = new ArrayList<>(
+        overUtilizedNodes.size() + underUtilizedNodes.size());
+
+    if (numDatanodesBalanced + numDatanodesToBalance > maxDatanodesToBalance) {
+      LOG.info("Approaching Max Datanodes To Balance limit in Container " +
+          "Balancer. Stopping Balancer.");
+      stop();
+      return false;
+    } else {
+      unBalancedNodes.addAll(overUtilizedNodes);
+      unBalancedNodes.addAll(underUtilizedNodes);
+
+      if (unBalancedNodes.isEmpty()) {
+        LOG.info("Did not find any unbalanced Datanodes.");
+        stop();
+        return false;
+      } else {
+        LOG.info("Container Balancer has identified Datanodes that need to be" +
+            " balanced.");
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Performs binary search to determine if the specified listToSearch
+   * contains the specified node.
+   *
+   * @param listToSearch List of DatanodeUsageInfo to be searched.
+   * @param node DatanodeUsageInfo to be searched for.
+   * @return true if the specified node is present in listToSearch, otherwise
+   * false.
+   */
+  private boolean containsNode(
+      List<DatanodeUsageInfo> listToSearch, DatanodeUsageInfo node) {
+    int index = 0;
+    Comparator<DatanodeUsageInfo> comparator =
+        DatanodeUsageInfo.getMostUsedByRemainingRatio();
+
+    if (comparator.compare(listToSearch.get(0),
+        listToSearch.get(listToSearch.size() - 1)) < 0) {
+      index =
+          Collections.binarySearch(listToSearch, node, comparator.reversed());
+    } else {
+      index = Collections.binarySearch(listToSearch, node, comparator);
+    }
+    return index >= 0 && listToSearch.get(index).equals(node);
+  }
+
+  /**
+   * Calculates the number of used bytes given capacity and utilization ratio.
+   *
+   * @param nodeCapacity capacity of the node.
+   * @param utilizationRatio used space by capacity ratio of the node.
+   * @return number of bytes
+   */
+  private double ratioToBytes(Long nodeCapacity, double utilizationRatio) {
+    return nodeCapacity * utilizationRatio;
+  }
+
+  /**
+   * Calculates the average datanode utilization for the specified nodes.
+   * Utilization is used space divided by capacity.
+   *
+   * @param nodes List of DatanodeUsageInfo to find the average utilization for
+   * @return Average utilization value
+   * @throws ArithmeticException Division by zero
+   */
+  private double calculateAvgUtilization(List<DatanodeUsageInfo> nodes)
+      throws ArithmeticException {
     SCMNodeStat aggregatedStats = new SCMNodeStat(
         0, 0, 0);
     for (DatanodeUsageInfo node : nodes) {
       aggregatedStats.add(node.getScmNodeStat());
     }
-    return aggregatedStats.getScmUsed().get().doubleValue() /
-        aggregatedStats.getCapacity().get().doubleValue();
+    clusterCapacity = aggregatedStats.getCapacity().get();
+    clusterUsed = aggregatedStats.getScmUsed().get();
+    clusterRemaining = aggregatedStats.getRemaining().get();
+
+    try {
+      return clusterUsed / (double) clusterCapacity;
+    } catch (ArithmeticException e) {

Review comment:
       Updated PR. Please take a look.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] siddhantsangwan commented on a change in pull request #2230: HDDS-4927. Add support for initializing an iteration in ContainerBalancer. Add unit tests.

Posted by GitBox <gi...@apache.org>.
siddhantsangwan commented on a change in pull request #2230:
URL: https://github.com/apache/ozone/pull/2230#discussion_r630080647



##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
##########
@@ -59,74 +80,321 @@ public ContainerBalancer(
     this.ozoneConfiguration = ozoneConfiguration;
     this.balancerRunning = false;
     this.config = new ContainerBalancerConfiguration();
+    this.metrics = new ContainerBalancerMetrics();
   }
 
   /**
-   * Start ContainerBalancer. Current implementation is incomplete.
+   * Starts ContainerBalancer. Current implementation is incomplete.
    *
    * @param balancerConfiguration Configuration values.
    */
   public void start(ContainerBalancerConfiguration balancerConfiguration) {
+    if (balancerRunning) {
+      LOG.info("Container Balancer is already running.");
+      throw new RuntimeException();
+    }
     this.balancerRunning = true;
-
     ozoneConfiguration = new OzoneConfiguration();
 
-    // initialise configs
     this.config = balancerConfiguration;
     this.threshold = config.getThreshold();
-    this.maxDatanodesToBalance =
-        config.getMaxDatanodesToBalance();
+    this.maxDatanodesToBalance = config.getMaxDatanodesToBalance();
     this.maxSizeToMove = config.getMaxSizeToMove();
 
+    this.clusterCapacity = 0L;
+    this.clusterUsed = 0L;
+    this.clusterRemaining = 0L;
+
+    this.overUtilizedNodes = new ArrayList<>();
+    this.underUtilizedNodes = new ArrayList<>();
+    this.aboveAverageUtilizedNodes = new ArrayList<>();
+    this.belowAverageUtilizedNodes = new ArrayList<>();
+    this.sourceNodes = new ArrayList<>();
+
     LOG.info("Starting Container Balancer...");
+    LOG.info(toString());

Review comment:
       Okay.

##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
##########
@@ -59,74 +80,321 @@ public ContainerBalancer(
     this.ozoneConfiguration = ozoneConfiguration;
     this.balancerRunning = false;
     this.config = new ContainerBalancerConfiguration();
+    this.metrics = new ContainerBalancerMetrics();
   }
 
   /**
-   * Start ContainerBalancer. Current implementation is incomplete.
+   * Starts ContainerBalancer. Current implementation is incomplete.
    *
    * @param balancerConfiguration Configuration values.
    */
   public void start(ContainerBalancerConfiguration balancerConfiguration) {
+    if (balancerRunning) {
+      LOG.info("Container Balancer is already running.");
+      throw new RuntimeException();
+    }
     this.balancerRunning = true;
-
     ozoneConfiguration = new OzoneConfiguration();
 
-    // initialise configs
     this.config = balancerConfiguration;
     this.threshold = config.getThreshold();
-    this.maxDatanodesToBalance =
-        config.getMaxDatanodesToBalance();
+    this.maxDatanodesToBalance = config.getMaxDatanodesToBalance();
     this.maxSizeToMove = config.getMaxSizeToMove();
 
+    this.clusterCapacity = 0L;
+    this.clusterUsed = 0L;
+    this.clusterRemaining = 0L;
+
+    this.overUtilizedNodes = new ArrayList<>();
+    this.underUtilizedNodes = new ArrayList<>();
+    this.aboveAverageUtilizedNodes = new ArrayList<>();
+    this.belowAverageUtilizedNodes = new ArrayList<>();
+    this.sourceNodes = new ArrayList<>();
+
     LOG.info("Starting Container Balancer...");
+    LOG.info(toString());
+
+    balance();
+  }
 
-    // sorted list in order from most to least used
-    List<DatanodeUsageInfo> nodes = nodeManager.
-        getMostOrLeastUsedDatanodes(true);
-    double avgUtilisation = calculateAvgUtilisation(nodes);
+  /**
+   * Balances the cluster.
+   */
+  private void balance() {
+    overUtilizedNodes.clear();
+    underUtilizedNodes.clear();
+    aboveAverageUtilizedNodes.clear();
+    belowAverageUtilizedNodes.clear();
+    initializeIteration();
+  }
+
+  /**
+   * Initializes an iteration during balancing. Recognizes over, under,
+   * below-average,and under-average utilizes nodes. Decides whether
+   * balancing needs to continue or should be stopped.
+   *
+   * @return true if successfully initialized, otherwise false.
+   */
+  private boolean initializeIteration() {
+    List<DatanodeUsageInfo> nodes;

Review comment:
       Yes, changing that.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] siddhantsangwan commented on a change in pull request #2230: HDDS-4927. Add support for initializing an iteration in ContainerBalancer. Add unit tests.

Posted by GitBox <gi...@apache.org>.
siddhantsangwan commented on a change in pull request #2230:
URL: https://github.com/apache/ozone/pull/2230#discussion_r629901382



##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
##########
@@ -59,74 +80,305 @@ public ContainerBalancer(
     this.ozoneConfiguration = ozoneConfiguration;
     this.balancerRunning = false;
     this.config = new ContainerBalancerConfiguration();
+    this.metrics = new ContainerBalancerMetrics();
   }
 
   /**
-   * Start ContainerBalancer. Current implementation is incomplete.
+   * Starts ContainerBalancer. Current implementation is incomplete.
    *
    * @param balancerConfiguration Configuration values.
    */
   public void start(ContainerBalancerConfiguration balancerConfiguration) {
+    if (balancerRunning) {
+      LOG.info("Container Balancer is already running.");
+      throw new RuntimeException();
+    }
     this.balancerRunning = true;
-
     ozoneConfiguration = new OzoneConfiguration();
 
-    // initialise configs
     this.config = balancerConfiguration;
     this.threshold = config.getThreshold();
-    this.maxDatanodesToBalance =
-        config.getMaxDatanodesToBalance();
+    this.maxDatanodesToBalance = config.getMaxDatanodesToBalance();
     this.maxSizeToMove = config.getMaxSizeToMove();
 
+    this.clusterCapacity = 0L;
+    this.clusterUsed = 0L;
+    this.clusterRemaining = 0L;
+
+    this.overUtilizedNodes = new ArrayList<>();
+    this.underUtilizedNodes = new ArrayList<>();
+    this.aboveAverageUtilizedNodes = new ArrayList<>();
+    this.belowAverageUtilizedNodes = new ArrayList<>();
+    this.sourceNodes = new ArrayList<>();
+
     LOG.info("Starting Container Balancer...");
+    LOG.info(toString());
+
+    balance();
+  }
 
-    // sorted list in order from most to least used
-    List<DatanodeUsageInfo> nodes = nodeManager.
-        getMostOrLeastUsedDatanodes(true);
-    double avgUtilisation = calculateAvgUtilisation(nodes);
+  /**
+   * Balances the cluster.
+   */
+  private void balance() {
+    boolean initialized = initializeIteration();
+  }
+
+  /**
+   * Initializes an iteration during balancing. Recognizes over, under,
+   * below-average,and under-average utilizes nodes. Decides whether
+   * balancing needs to continue or should be stopped.
+   */
+  private boolean initializeIteration() {
+    List<DatanodeUsageInfo> nodes;
+    try {
+      // sorted list in order from most to least used
+      nodes = nodeManager.getMostOrLeastUsedDatanodes(true);
+    } catch (NullPointerException e) {
+      LOG.error("Container Balancer could not retrieve nodes from Node " +
+          "Manager.", e);
+      stop();
+      return false;
+    }
+
+    clusterAvgUtilisation = calculateAvgUtilization(nodes);
+    LOG.info("Average utilization of the cluster is {}", clusterAvgUtilisation);
 
     // under utilized nodes have utilization(that is, used / capacity) less
     // than lower limit
-    double lowerLimit = avgUtilisation - threshold;
+    double lowerLimit = clusterAvgUtilisation - threshold;
 
     // over utilized nodes have utilization(that is, used / capacity) greater
     // than upper limit
-    double upperLimit = avgUtilisation + threshold;
+    double upperLimit = clusterAvgUtilisation + threshold;
+
     LOG.info("Lower limit for utilization is {}", lowerLimit);
     LOG.info("Upper limit for utilization is {}", upperLimit);
 
-    // find over utilised(source) and under utilised(target) nodes
-    sourceNodes = new ArrayList<>();
-    targetNodes = new ArrayList<>();
-//    for (DatanodeUsageInfo node : nodes) {
-//      SCMNodeStat stat = node.getScmNodeStat();
-//      double utilization = stat.getScmUsed().get().doubleValue() /
-//          stat.getCapacity().get().doubleValue();
-//      if (utilization > upperLimit) {
-//        sourceNodes.add(node);
-//      } else if (utilization < lowerLimit || utilization < avgUtilisation) {
-//        targetNodes.add(node);
-//      }
-//    }
-  }
-
-  // calculate the average datanode utilisation across the cluster
-  private double calculateAvgUtilisation(List<DatanodeUsageInfo> nodes) {
+    long numDatanodesToBalance = 0L;
+    double overLoadedBytes = 0D, underLoadedBytes = 0D;
+
+    // find over and under utilized nodes
+    for (DatanodeUsageInfo node : nodes) {
+      double utilization = calculateUtilization(node);
+      if (utilization > clusterAvgUtilisation) {
+        if (utilization > upperLimit) {
+          overUtilizedNodes.add(node);
+          numDatanodesToBalance += 1;
+
+          // amount of bytes greater than upper limit in this node
+          overLoadedBytes +=
+              ratioToBytes(node.getScmNodeStat().getCapacity().get(),
+                  utilization) -
+                  ratioToBytes(node.getScmNodeStat().getCapacity().get(),
+                      upperLimit);
+        } else {
+          aboveAverageUtilizedNodes.add(node);
+        }
+      } else if (utilization < clusterAvgUtilisation) {
+        if (utilization < lowerLimit) {
+          underUtilizedNodes.add(node);
+          numDatanodesToBalance += 1;
+
+          // amount of bytes lesser than lower limit in this node
+          underLoadedBytes +=
+              ratioToBytes(node.getScmNodeStat().getCapacity().get(),
+                  lowerLimit) -
+                  ratioToBytes(node.getScmNodeStat().getCapacity().get(),
+                      utilization);
+        } else {
+          belowAverageUtilizedNodes.add(node);
+        }
+      }
+    }
+
+    Collections.reverse(underUtilizedNodes);
+    Collections.reverse(belowAverageUtilizedNodes);
+
+    long numDatanodesBalanced = 0;
+    // count number of nodes that were balanced in previous iteration
+    for (DatanodeUsageInfo node : sourceNodes) {
+      if (!containsNode(overUtilizedNodes, node) &&
+          !containsNode(underUtilizedNodes, node)) {
+        numDatanodesBalanced += 1;
+      }
+    }
+    metrics.setNumDatanodesBalanced(new LongMetric(numDatanodesBalanced));
+    sourceNodes = new ArrayList<>(
+        overUtilizedNodes.size() + underUtilizedNodes.size());
+
+    if (numDatanodesBalanced + numDatanodesToBalance > maxDatanodesToBalance) {
+      LOG.info("Approaching Max Datanodes To Balance limit in Container " +
+          "Balancer. Stopping Balancer.");
+      stop();
+      return false;
+    } else {
+      sourceNodes.addAll(overUtilizedNodes);
+      sourceNodes.addAll(underUtilizedNodes);

Review comment:
       Yes. However, the meaning of source nodes might change according to the algorithm for moving containers. The term will be definite once the exact algorithm is final.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] siddhantsangwan commented on a change in pull request #2230: HDDS-4927. Add support for initializing an iteration in ContainerBalancer. Add unit tests.

Posted by GitBox <gi...@apache.org>.
siddhantsangwan commented on a change in pull request #2230:
URL: https://github.com/apache/ozone/pull/2230#discussion_r631595999



##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
##########
@@ -45,9 +48,27 @@
   private long maxSizeToMove;
   private boolean balancerRunning;
   private List<DatanodeUsageInfo> sourceNodes;
-  private List<DatanodeUsageInfo> targetNodes;
+  private List<DatanodeUsageInfo> overUtilizedNodes;
+  private List<DatanodeUsageInfo> underUtilizedNodes;
+  private List<DatanodeUsageInfo> aboveAverageUtilizedNodes;
+  private List<DatanodeUsageInfo> belowAverageUtilizedNodes;

Review comment:
       Replacing these with a single list `withinThresholdUtilizedNodes`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] lokeshj1703 commented on a change in pull request #2230: HDDS-4927. Add support for initializing an iteration in ContainerBalancer. Add unit tests.

Posted by GitBox <gi...@apache.org>.
lokeshj1703 commented on a change in pull request #2230:
URL: https://github.com/apache/ozone/pull/2230#discussion_r630786402



##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
##########
@@ -59,74 +80,321 @@ public ContainerBalancer(
     this.ozoneConfiguration = ozoneConfiguration;
     this.balancerRunning = false;
     this.config = new ContainerBalancerConfiguration();
+    this.metrics = new ContainerBalancerMetrics();
   }
 
   /**
-   * Start ContainerBalancer. Current implementation is incomplete.
+   * Starts ContainerBalancer. Current implementation is incomplete.
    *
    * @param balancerConfiguration Configuration values.
    */
   public void start(ContainerBalancerConfiguration balancerConfiguration) {
+    if (balancerRunning) {
+      LOG.info("Container Balancer is already running.");
+      throw new RuntimeException();
+    }
     this.balancerRunning = true;
-
     ozoneConfiguration = new OzoneConfiguration();
 
-    // initialise configs
     this.config = balancerConfiguration;
     this.threshold = config.getThreshold();
-    this.maxDatanodesToBalance =
-        config.getMaxDatanodesToBalance();
+    this.maxDatanodesToBalance = config.getMaxDatanodesToBalance();
     this.maxSizeToMove = config.getMaxSizeToMove();
 
+    this.clusterCapacity = 0L;
+    this.clusterUsed = 0L;
+    this.clusterRemaining = 0L;
+
+    this.overUtilizedNodes = new ArrayList<>();
+    this.underUtilizedNodes = new ArrayList<>();
+    this.aboveAverageUtilizedNodes = new ArrayList<>();
+    this.belowAverageUtilizedNodes = new ArrayList<>();
+    this.sourceNodes = new ArrayList<>();
+
     LOG.info("Starting Container Balancer...");
+    LOG.info(toString());
+
+    balance();
+  }
 
-    // sorted list in order from most to least used
-    List<DatanodeUsageInfo> nodes = nodeManager.
-        getMostOrLeastUsedDatanodes(true);
-    double avgUtilisation = calculateAvgUtilisation(nodes);
+  /**
+   * Balances the cluster.
+   */
+  private void balance() {
+    overUtilizedNodes.clear();
+    underUtilizedNodes.clear();
+    aboveAverageUtilizedNodes.clear();
+    belowAverageUtilizedNodes.clear();
+    initializeIteration();
+  }
+
+  /**
+   * Initializes an iteration during balancing. Recognizes over, under,
+   * below-average,and under-average utilizes nodes. Decides whether
+   * balancing needs to continue or should be stopped.
+   *
+   * @return true if successfully initialized, otherwise false.
+   */
+  private boolean initializeIteration() {

Review comment:
       Can we divide this function into further smaller functions? Like calculateAverageUtilisation, identifyOverAndUnderUtilizedNodes etc. ? We can avoid a large function then.

##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
##########
@@ -59,74 +80,321 @@ public ContainerBalancer(
     this.ozoneConfiguration = ozoneConfiguration;
     this.balancerRunning = false;
     this.config = new ContainerBalancerConfiguration();
+    this.metrics = new ContainerBalancerMetrics();
   }
 
   /**
-   * Start ContainerBalancer. Current implementation is incomplete.
+   * Starts ContainerBalancer. Current implementation is incomplete.
    *
    * @param balancerConfiguration Configuration values.
    */
   public void start(ContainerBalancerConfiguration balancerConfiguration) {
+    if (balancerRunning) {
+      LOG.info("Container Balancer is already running.");
+      throw new RuntimeException();
+    }
     this.balancerRunning = true;
-
     ozoneConfiguration = new OzoneConfiguration();
 
-    // initialise configs
     this.config = balancerConfiguration;
     this.threshold = config.getThreshold();
-    this.maxDatanodesToBalance =
-        config.getMaxDatanodesToBalance();
+    this.maxDatanodesToBalance = config.getMaxDatanodesToBalance();
     this.maxSizeToMove = config.getMaxSizeToMove();
 
+    this.clusterCapacity = 0L;
+    this.clusterUsed = 0L;
+    this.clusterRemaining = 0L;
+
+    this.overUtilizedNodes = new ArrayList<>();
+    this.underUtilizedNodes = new ArrayList<>();
+    this.aboveAverageUtilizedNodes = new ArrayList<>();
+    this.belowAverageUtilizedNodes = new ArrayList<>();
+    this.sourceNodes = new ArrayList<>();
+
     LOG.info("Starting Container Balancer...");
+    LOG.info(toString());
+
+    balance();
+  }
 
-    // sorted list in order from most to least used
-    List<DatanodeUsageInfo> nodes = nodeManager.
-        getMostOrLeastUsedDatanodes(true);
-    double avgUtilisation = calculateAvgUtilisation(nodes);
+  /**
+   * Balances the cluster.
+   */
+  private void balance() {
+    overUtilizedNodes.clear();
+    underUtilizedNodes.clear();
+    aboveAverageUtilizedNodes.clear();
+    belowAverageUtilizedNodes.clear();
+    initializeIteration();
+  }
+
+  /**
+   * Initializes an iteration during balancing. Recognizes over, under,
+   * below-average,and under-average utilizes nodes. Decides whether
+   * balancing needs to continue or should be stopped.
+   *
+   * @return true if successfully initialized, otherwise false.
+   */
+  private boolean initializeIteration() {
+    List<DatanodeUsageInfo> nodes;
+    try {
+      // sorted list in order from most to least used
+      nodes = nodeManager.getMostOrLeastUsedDatanodes(true);
+    } catch (NullPointerException e) {
+      LOG.error("Container Balancer could not retrieve nodes from Node " +
+          "Manager.", e);
+      stop();
+      return false;
+    }
+
+    try {
+      clusterAvgUtilisation = calculateAvgUtilization(nodes);
+    } catch(ArithmeticException e) {
+      LOG.warn("Container Balancer failed to initialize an iteration", e);
+      return false;
+    }
+    LOG.info("Average utilization of the cluster is {}", clusterAvgUtilisation);
 
     // under utilized nodes have utilization(that is, used / capacity) less
     // than lower limit
-    double lowerLimit = avgUtilisation - threshold;
+    double lowerLimit = clusterAvgUtilisation - threshold;
 
     // over utilized nodes have utilization(that is, used / capacity) greater
     // than upper limit
-    double upperLimit = avgUtilisation + threshold;
+    double upperLimit = clusterAvgUtilisation + threshold;
+
     LOG.info("Lower limit for utilization is {}", lowerLimit);
     LOG.info("Upper limit for utilization is {}", upperLimit);
 
-    // find over utilised(source) and under utilised(target) nodes
-    sourceNodes = new ArrayList<>();
-    targetNodes = new ArrayList<>();
-//    for (DatanodeUsageInfo node : nodes) {
-//      SCMNodeStat stat = node.getScmNodeStat();
-//      double utilization = stat.getScmUsed().get().doubleValue() /
-//          stat.getCapacity().get().doubleValue();
-//      if (utilization > upperLimit) {
-//        sourceNodes.add(node);
-//      } else if (utilization < lowerLimit || utilization < avgUtilisation) {
-//        targetNodes.add(node);
-//      }
-//    }
-  }
-
-  // calculate the average datanode utilisation across the cluster
-  private double calculateAvgUtilisation(List<DatanodeUsageInfo> nodes) {
+    long numDatanodesToBalance = 0L;
+    double overLoadedBytes = 0D, underLoadedBytes = 0D;
+
+    // find over and under utilized nodes
+    for (DatanodeUsageInfo node : nodes) {
+      double utilization = calculateUtilization(node);
+      if (utilization > clusterAvgUtilisation) {
+        if (utilization > upperLimit) {
+          overUtilizedNodes.add(node);
+          numDatanodesToBalance += 1;
+
+          // amount of bytes greater than upper limit in this node
+          overLoadedBytes +=
+              ratioToBytes(node.getScmNodeStat().getCapacity().get(),
+                  utilization) -
+                  ratioToBytes(node.getScmNodeStat().getCapacity().get(),
+                      upperLimit);
+        } else {
+          aboveAverageUtilizedNodes.add(node);
+        }
+      } else if (utilization < clusterAvgUtilisation) {
+        if (utilization < lowerLimit) {
+          underUtilizedNodes.add(node);
+          numDatanodesToBalance += 1;
+
+          // amount of bytes lesser than lower limit in this node
+          underLoadedBytes +=
+              ratioToBytes(node.getScmNodeStat().getCapacity().get(),
+                  lowerLimit) -
+                  ratioToBytes(node.getScmNodeStat().getCapacity().get(),
+                      utilization);
+        } else {
+          belowAverageUtilizedNodes.add(node);
+        }
+      }
+    }
+
+    Collections.reverse(underUtilizedNodes);
+    Collections.reverse(belowAverageUtilizedNodes);
+
+    long numDatanodesBalanced = 0;
+    // count number of nodes that were balanced in previous iteration
+    for (DatanodeUsageInfo node : sourceNodes) {
+      if (!containsNode(overUtilizedNodes, node) &&
+          !containsNode(underUtilizedNodes, node)) {
+        numDatanodesBalanced += 1;
+      }
+    }
+
+    // calculate total number of nodes that have been balanced
+    numDatanodesBalanced =
+        numDatanodesBalanced + metrics.getNumDatanodesBalanced().get();

Review comment:
       We can add another method in metrics to increment a metric then we do not need to fetch metric value.

##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
##########
@@ -59,74 +80,321 @@ public ContainerBalancer(
     this.ozoneConfiguration = ozoneConfiguration;
     this.balancerRunning = false;
     this.config = new ContainerBalancerConfiguration();
+    this.metrics = new ContainerBalancerMetrics();
   }
 
   /**
-   * Start ContainerBalancer. Current implementation is incomplete.
+   * Starts ContainerBalancer. Current implementation is incomplete.
    *
    * @param balancerConfiguration Configuration values.
    */
   public void start(ContainerBalancerConfiguration balancerConfiguration) {
+    if (balancerRunning) {
+      LOG.info("Container Balancer is already running.");
+      throw new RuntimeException();
+    }
     this.balancerRunning = true;
-
     ozoneConfiguration = new OzoneConfiguration();
 
-    // initialise configs
     this.config = balancerConfiguration;
     this.threshold = config.getThreshold();
-    this.maxDatanodesToBalance =
-        config.getMaxDatanodesToBalance();
+    this.maxDatanodesToBalance = config.getMaxDatanodesToBalance();
     this.maxSizeToMove = config.getMaxSizeToMove();
 
+    this.clusterCapacity = 0L;
+    this.clusterUsed = 0L;
+    this.clusterRemaining = 0L;
+
+    this.overUtilizedNodes = new ArrayList<>();
+    this.underUtilizedNodes = new ArrayList<>();
+    this.aboveAverageUtilizedNodes = new ArrayList<>();
+    this.belowAverageUtilizedNodes = new ArrayList<>();
+    this.sourceNodes = new ArrayList<>();
+
     LOG.info("Starting Container Balancer...");
+    LOG.info(toString());
+
+    balance();
+  }
 
-    // sorted list in order from most to least used
-    List<DatanodeUsageInfo> nodes = nodeManager.
-        getMostOrLeastUsedDatanodes(true);
-    double avgUtilisation = calculateAvgUtilisation(nodes);
+  /**
+   * Balances the cluster.
+   */
+  private void balance() {
+    overUtilizedNodes.clear();
+    underUtilizedNodes.clear();
+    aboveAverageUtilizedNodes.clear();
+    belowAverageUtilizedNodes.clear();
+    initializeIteration();
+  }
+
+  /**
+   * Initializes an iteration during balancing. Recognizes over, under,
+   * below-average,and under-average utilizes nodes. Decides whether
+   * balancing needs to continue or should be stopped.
+   *
+   * @return true if successfully initialized, otherwise false.
+   */
+  private boolean initializeIteration() {
+    List<DatanodeUsageInfo> nodes;
+    try {
+      // sorted list in order from most to least used
+      nodes = nodeManager.getMostOrLeastUsedDatanodes(true);
+    } catch (NullPointerException e) {
+      LOG.error("Container Balancer could not retrieve nodes from Node " +
+          "Manager.", e);
+      stop();
+      return false;
+    }
+
+    try {
+      clusterAvgUtilisation = calculateAvgUtilization(nodes);

Review comment:
       I think we will also need to handle safe mode. Balancer should not operate when SCM is in safe mode.

##########
File path: hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancer.java
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.container.balancer;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.hdds.scm.container.ContainerManagerV2;
+import org.apache.hadoop.hdds.scm.container.MockNodeManager;
+import org.apache.hadoop.hdds.scm.container.ReplicationManager;
+import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
+import org.apache.hadoop.hdds.scm.node.DatanodeUsageInfo;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+public class TestContainerBalancer {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestContainerBalancer.class);
+
+  private ReplicationManager replicationManager;
+  private ContainerManagerV2 containerManager;
+  private ContainerBalancer containerBalancer;
+  private MockNodeManager mockNodeManager;
+  private OzoneConfiguration conf;
+  private ContainerBalancerConfiguration balancerConfiguration;
+  private List<DatanodeUsageInfo> nodesInCluster;
+  private List<Double> nodeUtilizations;
+  private double averageUtilization;
+  private int numberOfNodes;
+
+  /**
+   * Sets up configuration values and creates a mock cluster.
+   */
+  @Before
+  public void setup() {
+    conf = new OzoneConfiguration();
+    containerManager = Mockito.mock(ContainerManagerV2.class);
+    replicationManager = Mockito.mock(ReplicationManager.class);
+
+    balancerConfiguration = new ContainerBalancerConfiguration();
+    balancerConfiguration.setThreshold("0.1");
+    balancerConfiguration.setMaxDatanodesToBalance(10);
+    balancerConfiguration.setMaxSizeToMove(500L);
+    conf.setFromObject(balancerConfiguration);
+
+    this.numberOfNodes = 10;
+    generateUtilizations(numberOfNodes);

Review comment:
       NIT. We can move generateUtilizations function inside createNodesInCluster.

##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
##########
@@ -45,9 +48,27 @@
   private long maxSizeToMove;
   private boolean balancerRunning;
   private List<DatanodeUsageInfo> sourceNodes;
-  private List<DatanodeUsageInfo> targetNodes;
+  private List<DatanodeUsageInfo> overUtilizedNodes;
+  private List<DatanodeUsageInfo> underUtilizedNodes;
+  private List<DatanodeUsageInfo> aboveAverageUtilizedNodes;
+  private List<DatanodeUsageInfo> belowAverageUtilizedNodes;

Review comment:
       Can we not have these lists? I think it might be simpler to maintain another list or set of nodes which are not categorised as either over or under utilized.

##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerConfiguration.java
##########
@@ -45,58 +50,75 @@
   private int maxDatanodesToBalance = 5;
 
   @Config(key = "size.moved.max", type = ConfigType.LONG,
-      defaultValue = "10737418240L", tags = {ConfigTag.BALANCER},
-      description = "The maximum size of data in Bytes that will be moved " +
-          "by the Container Balancer.")
-  private long maxSizeToMove = 10737418240L;
+      defaultValue = "10", tags = {ConfigTag.BALANCER},
+      description = "The maximum size of data in GB that will be moved " +
+          "by Container Balancer.")
+  private long maxSizeToMove = 10;
 
   /**
-   * Get the threshold value for Container Balancer.
+   * Gets the threshold value for Container Balancer.
+   *
    * @return a fraction in the range 0 to 1
    */
   public double getThreshold() {
-    return threshold;
+    return Double.parseDouble(threshold);
   }
 
   /**
-   * Set the threshold value for Container Balancer.
+   * Sets the threshold value for Container Balancer.
+   *
    * @param threshold a fraction in the range 0 to 1
    */
-  public void setThreshold(double threshold) {
+  public void setThreshold(String threshold) {

Review comment:
       It can take double as argument.

##########
File path: hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
##########
@@ -139,6 +137,37 @@ public MockNodeManager(boolean initializeFakeNodes, int nodeCount) {
         initializeFakeNodes, nodeCount);
   }
 
+  public MockNodeManager(List<DatanodeUsageInfo> nodes)

Review comment:
       The common functionality with the other constructor can be extracted to a separate function.

##########
File path: hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancer.java
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.container.balancer;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.hdds.scm.container.ContainerManagerV2;
+import org.apache.hadoop.hdds.scm.container.MockNodeManager;
+import org.apache.hadoop.hdds.scm.container.ReplicationManager;
+import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
+import org.apache.hadoop.hdds.scm.node.DatanodeUsageInfo;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+public class TestContainerBalancer {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestContainerBalancer.class);
+
+  private ReplicationManager replicationManager;
+  private ContainerManagerV2 containerManager;
+  private ContainerBalancer containerBalancer;
+  private MockNodeManager mockNodeManager;
+  private OzoneConfiguration conf;
+  private ContainerBalancerConfiguration balancerConfiguration;
+  private List<DatanodeUsageInfo> nodesInCluster;
+  private List<Double> nodeUtilizations;
+  private double averageUtilization;
+  private int numberOfNodes;
+
+  /**
+   * Sets up configuration values and creates a mock cluster.
+   */
+  @Before
+  public void setup() {
+    conf = new OzoneConfiguration();
+    containerManager = Mockito.mock(ContainerManagerV2.class);
+    replicationManager = Mockito.mock(ReplicationManager.class);
+
+    balancerConfiguration = new ContainerBalancerConfiguration();
+    balancerConfiguration.setThreshold("0.1");
+    balancerConfiguration.setMaxDatanodesToBalance(10);
+    balancerConfiguration.setMaxSizeToMove(500L);
+    conf.setFromObject(balancerConfiguration);
+
+    this.numberOfNodes = 10;
+    generateUtilizations(numberOfNodes);
+
+    // create datanodes with the generated nodeUtilization values
+    this.averageUtilization = createNodesInCluster();
+    mockNodeManager = new MockNodeManager(nodesInCluster);
+    containerBalancer = new ContainerBalancer(mockNodeManager, containerManager,
+        replicationManager, conf);
+  }
+
+  /**
+   * Checks whether ContainerBalancer is correctly updating the list of source
+   * nodes with varying values of Threshold.
+   */
+  @Test
+  public void initializeIterationShouldUpdateSourceNodesWhenThresholdChanges() {
+    List<DatanodeUsageInfo> expectedSourceNodes;
+    List<DatanodeUsageInfo> sourceNodesAccordingToBalancer;
+
+    // check for random threshold values
+    for (int i = 0; i < 50; i++) {
+      double randomThreshold = Math.random();
+
+      balancerConfiguration.setThreshold(String.valueOf(randomThreshold));
+      containerBalancer.start(balancerConfiguration);
+      expectedSourceNodes = determineExpectedSourceNodes(randomThreshold);
+      sourceNodesAccordingToBalancer = containerBalancer.getSourceNodes();
+
+      Assert.assertEquals(
+          expectedSourceNodes.size(), sourceNodesAccordingToBalancer.size());
+
+      for (int j = 0; j < expectedSourceNodes.size(); j++) {
+        Assert.assertEquals(expectedSourceNodes.get(j).getDatanodeDetails(),
+            sourceNodesAccordingToBalancer.get(j).getDatanodeDetails());
+      }
+      containerBalancer.stop();
+    }
+
+  }
+
+  /**
+   * Checks whether the list of source is empty when the cluster is balanced.
+   */
+  @Test
+  public void sourceNodesListShouldBeEmptyWhenClusterIsBalanced() {
+    balancerConfiguration.setThreshold("0.99");
+    containerBalancer.start(balancerConfiguration);
+
+    Assert.assertEquals(0, containerBalancer.getSourceNodes().size());
+    containerBalancer.stop();
+  }
+
+  /**
+   * Checks whether ContainerBalancer stops when the limit of
+   * MaxDatanodesToBalance is reached.
+   */
+  @Test
+  public void containerBalancerShouldStopWhenMaxDatanodesToBalanceIsReached() {
+    balancerConfiguration.setMaxDatanodesToBalance(2);
+    balancerConfiguration.setThreshold("0");
+    containerBalancer.start(balancerConfiguration);
+
+    Assert.assertFalse(containerBalancer.isBalancerRunning());
+    containerBalancer.stop();
+  }
+
+  /**
+   * Determines source nodes, that is, over and under utilized nodes,
+   * according to the generated utilization values for nodes and the threshold.
+   *
+   * @param threshold A fraction from range 0 to 1.
+   * @return List of DatanodeUsageInfo containing the expected(correct)
+   * source nodes.
+   */
+  private List<DatanodeUsageInfo> determineExpectedSourceNodes(
+      double threshold) {
+    double lowerLimit = averageUtilization - threshold;
+    double upperLimit = averageUtilization + threshold;
+
+    // use node utilizations to determine over and under utilized nodes
+    List<DatanodeUsageInfo> expectedSourceNodes = new ArrayList<>();
+    for (int i = 0; i < numberOfNodes; i++) {
+      if (nodeUtilizations.get(numberOfNodes - i - 1) > upperLimit) {

Review comment:
       Can we use nodesInCluster directly to determine the source nodes?

##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerConfiguration.java
##########
@@ -45,58 +50,75 @@
   private int maxDatanodesToBalance = 5;
 
   @Config(key = "size.moved.max", type = ConfigType.LONG,

Review comment:
       We can use ConfigType.SIZE here.

##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
##########
@@ -59,74 +80,321 @@ public ContainerBalancer(
     this.ozoneConfiguration = ozoneConfiguration;
     this.balancerRunning = false;
     this.config = new ContainerBalancerConfiguration();
+    this.metrics = new ContainerBalancerMetrics();
   }
 
   /**
-   * Start ContainerBalancer. Current implementation is incomplete.
+   * Starts ContainerBalancer. Current implementation is incomplete.
    *
    * @param balancerConfiguration Configuration values.
    */
   public void start(ContainerBalancerConfiguration balancerConfiguration) {
+    if (balancerRunning) {
+      LOG.info("Container Balancer is already running.");
+      throw new RuntimeException();
+    }
     this.balancerRunning = true;
-
     ozoneConfiguration = new OzoneConfiguration();
 
-    // initialise configs
     this.config = balancerConfiguration;
     this.threshold = config.getThreshold();
-    this.maxDatanodesToBalance =
-        config.getMaxDatanodesToBalance();
+    this.maxDatanodesToBalance = config.getMaxDatanodesToBalance();
     this.maxSizeToMove = config.getMaxSizeToMove();
 
+    this.clusterCapacity = 0L;
+    this.clusterUsed = 0L;
+    this.clusterRemaining = 0L;
+
+    this.overUtilizedNodes = new ArrayList<>();
+    this.underUtilizedNodes = new ArrayList<>();
+    this.aboveAverageUtilizedNodes = new ArrayList<>();
+    this.belowAverageUtilizedNodes = new ArrayList<>();
+    this.sourceNodes = new ArrayList<>();
+
     LOG.info("Starting Container Balancer...");
+    LOG.info(toString());
+
+    balance();
+  }
 
-    // sorted list in order from most to least used
-    List<DatanodeUsageInfo> nodes = nodeManager.
-        getMostOrLeastUsedDatanodes(true);
-    double avgUtilisation = calculateAvgUtilisation(nodes);
+  /**
+   * Balances the cluster.
+   */
+  private void balance() {
+    overUtilizedNodes.clear();
+    underUtilizedNodes.clear();
+    aboveAverageUtilizedNodes.clear();
+    belowAverageUtilizedNodes.clear();
+    initializeIteration();
+  }
+
+  /**
+   * Initializes an iteration during balancing. Recognizes over, under,
+   * below-average,and under-average utilizes nodes. Decides whether
+   * balancing needs to continue or should be stopped.
+   *
+   * @return true if successfully initialized, otherwise false.
+   */
+  private boolean initializeIteration() {
+    List<DatanodeUsageInfo> nodes;
+    try {
+      // sorted list in order from most to least used
+      nodes = nodeManager.getMostOrLeastUsedDatanodes(true);
+    } catch (NullPointerException e) {
+      LOG.error("Container Balancer could not retrieve nodes from Node " +
+          "Manager.", e);
+      stop();
+      return false;
+    }
+
+    try {
+      clusterAvgUtilisation = calculateAvgUtilization(nodes);
+    } catch(ArithmeticException e) {
+      LOG.warn("Container Balancer failed to initialize an iteration", e);
+      return false;
+    }
+    LOG.info("Average utilization of the cluster is {}", clusterAvgUtilisation);
 
     // under utilized nodes have utilization(that is, used / capacity) less
     // than lower limit
-    double lowerLimit = avgUtilisation - threshold;
+    double lowerLimit = clusterAvgUtilisation - threshold;
 
     // over utilized nodes have utilization(that is, used / capacity) greater
     // than upper limit
-    double upperLimit = avgUtilisation + threshold;
+    double upperLimit = clusterAvgUtilisation + threshold;
+
     LOG.info("Lower limit for utilization is {}", lowerLimit);
     LOG.info("Upper limit for utilization is {}", upperLimit);
 
-    // find over utilised(source) and under utilised(target) nodes
-    sourceNodes = new ArrayList<>();
-    targetNodes = new ArrayList<>();
-//    for (DatanodeUsageInfo node : nodes) {
-//      SCMNodeStat stat = node.getScmNodeStat();
-//      double utilization = stat.getScmUsed().get().doubleValue() /
-//          stat.getCapacity().get().doubleValue();
-//      if (utilization > upperLimit) {
-//        sourceNodes.add(node);
-//      } else if (utilization < lowerLimit || utilization < avgUtilisation) {
-//        targetNodes.add(node);
-//      }
-//    }
-  }
-
-  // calculate the average datanode utilisation across the cluster
-  private double calculateAvgUtilisation(List<DatanodeUsageInfo> nodes) {
+    long numDatanodesToBalance = 0L;
+    double overLoadedBytes = 0D, underLoadedBytes = 0D;
+
+    // find over and under utilized nodes
+    for (DatanodeUsageInfo node : nodes) {
+      double utilization = calculateUtilization(node);
+      if (utilization > clusterAvgUtilisation) {
+        if (utilization > upperLimit) {
+          overUtilizedNodes.add(node);
+          numDatanodesToBalance += 1;
+
+          // amount of bytes greater than upper limit in this node
+          overLoadedBytes +=
+              ratioToBytes(node.getScmNodeStat().getCapacity().get(),
+                  utilization) -
+                  ratioToBytes(node.getScmNodeStat().getCapacity().get(),
+                      upperLimit);
+        } else {
+          aboveAverageUtilizedNodes.add(node);
+        }
+      } else if (utilization < clusterAvgUtilisation) {
+        if (utilization < lowerLimit) {
+          underUtilizedNodes.add(node);
+          numDatanodesToBalance += 1;
+
+          // amount of bytes lesser than lower limit in this node
+          underLoadedBytes +=
+              ratioToBytes(node.getScmNodeStat().getCapacity().get(),
+                  lowerLimit) -
+                  ratioToBytes(node.getScmNodeStat().getCapacity().get(),
+                      utilization);
+        } else {
+          belowAverageUtilizedNodes.add(node);
+        }
+      }
+    }
+
+    Collections.reverse(underUtilizedNodes);
+    Collections.reverse(belowAverageUtilizedNodes);
+
+    long numDatanodesBalanced = 0;
+    // count number of nodes that were balanced in previous iteration
+    for (DatanodeUsageInfo node : sourceNodes) {
+      if (!containsNode(overUtilizedNodes, node) &&
+          !containsNode(underUtilizedNodes, node)) {
+        numDatanodesBalanced += 1;
+      }
+    }
+
+    // calculate total number of nodes that have been balanced
+    numDatanodesBalanced =
+        numDatanodesBalanced + metrics.getNumDatanodesBalanced().get();
+    metrics.setNumDatanodesBalanced(new LongMetric(numDatanodesBalanced));
+    sourceNodes = new ArrayList<>(
+        overUtilizedNodes.size() + underUtilizedNodes.size());
+
+    if (numDatanodesBalanced + numDatanodesToBalance > maxDatanodesToBalance) {
+      LOG.info("Approaching Max Datanodes To Balance limit in Container " +
+          "Balancer. Stopping Balancer.");
+      stop();
+      return false;
+    } else {
+      sourceNodes.addAll(overUtilizedNodes);
+      sourceNodes.addAll(underUtilizedNodes);
+
+      if (sourceNodes.isEmpty()) {
+        LOG.info("Did not find any unbalanced Datanodes.");
+        stop();
+        return false;
+      } else {
+        LOG.info("Container Balancer has identified Datanodes that need to be" +
+            " balanced.");
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Performs binary search to determine if the specified listToSearch
+   * contains the specified node.
+   *
+   * @param listToSearch List of DatanodeUsageInfo to be searched.
+   * @param node DatanodeUsageInfo to be searched for.
+   * @return true if the specified node is present in listToSearch, otherwise
+   * false.
+   */
+  private boolean containsNode(

Review comment:
       Comparator can return 0 for two different datanodes with same utilisation. We will need to handle that case.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] lokeshj1703 commented on a change in pull request #2230: HDDS-4927. Add support for initializing an iteration in ContainerBalancer. Add unit tests.

Posted by GitBox <gi...@apache.org>.
lokeshj1703 commented on a change in pull request #2230:
URL: https://github.com/apache/ozone/pull/2230#discussion_r639519492



##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
##########
@@ -40,96 +44,347 @@
   private ContainerManagerV2 containerManager;
   private ReplicationManager replicationManager;
   private OzoneConfiguration ozoneConfiguration;
+  private final SCMContext scmContext;
   private double threshold;
   private int maxDatanodesToBalance;
   private long maxSizeToMove;
-  private boolean balancerRunning;
-  private List<DatanodeUsageInfo> sourceNodes;
-  private List<DatanodeUsageInfo> targetNodes;
+  private List<DatanodeUsageInfo> unBalancedNodes;
+  private List<DatanodeUsageInfo> overUtilizedNodes;
+  private List<DatanodeUsageInfo> underUtilizedNodes;
+  private List<DatanodeUsageInfo> withinThresholdUtilizedNodes;
   private ContainerBalancerConfiguration config;
+  private ContainerBalancerMetrics metrics;
+  private long clusterCapacity;
+  private long clusterUsed;
+  private long clusterRemaining;
+  private double clusterAvgUtilisation;
+  private final AtomicBoolean balancerRunning = new AtomicBoolean(false);
 
+  /**
+   * Constructs ContainerBalancer with the specified arguments. Initializes
+   * new ContainerBalancerConfiguration and ContainerBalancerMetrics.
+   * Container Balancer does not start on construction.
+   *
+   * @param nodeManager NodeManager
+   * @param containerManager ContainerManager
+   * @param replicationManager ReplicationManager
+   * @param ozoneConfiguration OzoneConfiguration
+   */
   public ContainerBalancer(
       NodeManager nodeManager,
       ContainerManagerV2 containerManager,
       ReplicationManager replicationManager,
-      OzoneConfiguration ozoneConfiguration) {
+      OzoneConfiguration ozoneConfiguration,
+      final SCMContext scmContext) {
     this.nodeManager = nodeManager;
     this.containerManager = containerManager;
     this.replicationManager = replicationManager;
     this.ozoneConfiguration = ozoneConfiguration;
-    this.balancerRunning = false;
     this.config = new ContainerBalancerConfiguration();
+    this.metrics = new ContainerBalancerMetrics();
+    this.scmContext = scmContext;
+
+    this.clusterCapacity = 0L;
+    this.clusterUsed = 0L;
+    this.clusterRemaining = 0L;
+
+    this.overUtilizedNodes = new ArrayList<>();
+    this.underUtilizedNodes = new ArrayList<>();
+    this.unBalancedNodes = new ArrayList<>();
+    this.withinThresholdUtilizedNodes = new ArrayList<>();
   }
 
   /**
-   * Start ContainerBalancer. Current implementation is incomplete.
+   * Starts ContainerBalancer. Current implementation is incomplete.
    *
    * @param balancerConfiguration Configuration values.
    */
-  public void start(ContainerBalancerConfiguration balancerConfiguration) {
-    this.balancerRunning = true;
+  public boolean start(ContainerBalancerConfiguration balancerConfiguration) {

Review comment:
       This was initially decided in order to support configuration change using restart. Admin could restart balancer and balancer was supposed to load the new configuration values.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] siddhantsangwan commented on a change in pull request #2230: HDDS-4927. Add support for initializing an iteration in ContainerBalancer. Add unit tests.

Posted by GitBox <gi...@apache.org>.
siddhantsangwan commented on a change in pull request #2230:
URL: https://github.com/apache/ozone/pull/2230#discussion_r630948944



##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
##########
@@ -59,74 +80,321 @@ public ContainerBalancer(
     this.ozoneConfiguration = ozoneConfiguration;
     this.balancerRunning = false;
     this.config = new ContainerBalancerConfiguration();
+    this.metrics = new ContainerBalancerMetrics();
   }
 
   /**
-   * Start ContainerBalancer. Current implementation is incomplete.
+   * Starts ContainerBalancer. Current implementation is incomplete.
    *
    * @param balancerConfiguration Configuration values.
    */
   public void start(ContainerBalancerConfiguration balancerConfiguration) {
+    if (balancerRunning) {
+      LOG.info("Container Balancer is already running.");
+      throw new RuntimeException();
+    }
     this.balancerRunning = true;
-
     ozoneConfiguration = new OzoneConfiguration();
 
-    // initialise configs
     this.config = balancerConfiguration;
     this.threshold = config.getThreshold();
-    this.maxDatanodesToBalance =
-        config.getMaxDatanodesToBalance();
+    this.maxDatanodesToBalance = config.getMaxDatanodesToBalance();
     this.maxSizeToMove = config.getMaxSizeToMove();
 
+    this.clusterCapacity = 0L;
+    this.clusterUsed = 0L;
+    this.clusterRemaining = 0L;
+
+    this.overUtilizedNodes = new ArrayList<>();
+    this.underUtilizedNodes = new ArrayList<>();
+    this.aboveAverageUtilizedNodes = new ArrayList<>();
+    this.belowAverageUtilizedNodes = new ArrayList<>();
+    this.sourceNodes = new ArrayList<>();
+
     LOG.info("Starting Container Balancer...");
+    LOG.info(toString());
+
+    balance();
+  }
 
-    // sorted list in order from most to least used
-    List<DatanodeUsageInfo> nodes = nodeManager.
-        getMostOrLeastUsedDatanodes(true);
-    double avgUtilisation = calculateAvgUtilisation(nodes);
+  /**
+   * Balances the cluster.
+   */
+  private void balance() {
+    overUtilizedNodes.clear();
+    underUtilizedNodes.clear();
+    aboveAverageUtilizedNodes.clear();
+    belowAverageUtilizedNodes.clear();
+    initializeIteration();
+  }
+
+  /**
+   * Initializes an iteration during balancing. Recognizes over, under,
+   * below-average,and under-average utilizes nodes. Decides whether
+   * balancing needs to continue or should be stopped.
+   *
+   * @return true if successfully initialized, otherwise false.
+   */
+  private boolean initializeIteration() {
+    List<DatanodeUsageInfo> nodes;
+    try {
+      // sorted list in order from most to least used
+      nodes = nodeManager.getMostOrLeastUsedDatanodes(true);
+    } catch (NullPointerException e) {
+      LOG.error("Container Balancer could not retrieve nodes from Node " +
+          "Manager.", e);
+      stop();
+      return false;
+    }
+
+    try {
+      clusterAvgUtilisation = calculateAvgUtilization(nodes);
+    } catch(ArithmeticException e) {
+      LOG.warn("Container Balancer failed to initialize an iteration", e);
+      return false;
+    }
+    LOG.info("Average utilization of the cluster is {}", clusterAvgUtilisation);
 
     // under utilized nodes have utilization(that is, used / capacity) less
     // than lower limit
-    double lowerLimit = avgUtilisation - threshold;
+    double lowerLimit = clusterAvgUtilisation - threshold;
 
     // over utilized nodes have utilization(that is, used / capacity) greater
     // than upper limit
-    double upperLimit = avgUtilisation + threshold;
+    double upperLimit = clusterAvgUtilisation + threshold;
+
     LOG.info("Lower limit for utilization is {}", lowerLimit);
     LOG.info("Upper limit for utilization is {}", upperLimit);
 
-    // find over utilised(source) and under utilised(target) nodes
-    sourceNodes = new ArrayList<>();
-    targetNodes = new ArrayList<>();
-//    for (DatanodeUsageInfo node : nodes) {
-//      SCMNodeStat stat = node.getScmNodeStat();
-//      double utilization = stat.getScmUsed().get().doubleValue() /
-//          stat.getCapacity().get().doubleValue();
-//      if (utilization > upperLimit) {
-//        sourceNodes.add(node);
-//      } else if (utilization < lowerLimit || utilization < avgUtilisation) {
-//        targetNodes.add(node);
-//      }
-//    }
-  }
-
-  // calculate the average datanode utilisation across the cluster
-  private double calculateAvgUtilisation(List<DatanodeUsageInfo> nodes) {
+    long numDatanodesToBalance = 0L;
+    double overLoadedBytes = 0D, underLoadedBytes = 0D;
+
+    // find over and under utilized nodes
+    for (DatanodeUsageInfo node : nodes) {
+      double utilization = calculateUtilization(node);
+      if (utilization > clusterAvgUtilisation) {
+        if (utilization > upperLimit) {
+          overUtilizedNodes.add(node);
+          numDatanodesToBalance += 1;
+
+          // amount of bytes greater than upper limit in this node
+          overLoadedBytes +=
+              ratioToBytes(node.getScmNodeStat().getCapacity().get(),
+                  utilization) -
+                  ratioToBytes(node.getScmNodeStat().getCapacity().get(),
+                      upperLimit);
+        } else {
+          aboveAverageUtilizedNodes.add(node);
+        }
+      } else if (utilization < clusterAvgUtilisation) {
+        if (utilization < lowerLimit) {
+          underUtilizedNodes.add(node);
+          numDatanodesToBalance += 1;
+
+          // amount of bytes lesser than lower limit in this node
+          underLoadedBytes +=
+              ratioToBytes(node.getScmNodeStat().getCapacity().get(),
+                  lowerLimit) -
+                  ratioToBytes(node.getScmNodeStat().getCapacity().get(),
+                      utilization);
+        } else {
+          belowAverageUtilizedNodes.add(node);
+        }
+      }
+    }
+
+    Collections.reverse(underUtilizedNodes);
+    Collections.reverse(belowAverageUtilizedNodes);
+
+    long numDatanodesBalanced = 0;
+    // count number of nodes that were balanced in previous iteration
+    for (DatanodeUsageInfo node : sourceNodes) {
+      if (!containsNode(overUtilizedNodes, node) &&
+          !containsNode(underUtilizedNodes, node)) {
+        numDatanodesBalanced += 1;
+      }
+    }
+
+    // calculate total number of nodes that have been balanced
+    numDatanodesBalanced =
+        numDatanodesBalanced + metrics.getNumDatanodesBalanced().get();
+    metrics.setNumDatanodesBalanced(new LongMetric(numDatanodesBalanced));
+    sourceNodes = new ArrayList<>(
+        overUtilizedNodes.size() + underUtilizedNodes.size());
+
+    if (numDatanodesBalanced + numDatanodesToBalance > maxDatanodesToBalance) {
+      LOG.info("Approaching Max Datanodes To Balance limit in Container " +
+          "Balancer. Stopping Balancer.");
+      stop();
+      return false;
+    } else {
+      sourceNodes.addAll(overUtilizedNodes);
+      sourceNodes.addAll(underUtilizedNodes);
+
+      if (sourceNodes.isEmpty()) {
+        LOG.info("Did not find any unbalanced Datanodes.");
+        stop();
+        return false;
+      } else {
+        LOG.info("Container Balancer has identified Datanodes that need to be" +
+            " balanced.");
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Performs binary search to determine if the specified listToSearch
+   * contains the specified node.
+   *
+   * @param listToSearch List of DatanodeUsageInfo to be searched.
+   * @param node DatanodeUsageInfo to be searched for.
+   * @return true if the specified node is present in listToSearch, otherwise
+   * false.
+   */
+  private boolean containsNode(

Review comment:
       Good point.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] siddhantsangwan commented on a change in pull request #2230: HDDS-4927. Add support for initializing an iteration in ContainerBalancer. Add unit tests.

Posted by GitBox <gi...@apache.org>.
siddhantsangwan commented on a change in pull request #2230:
URL: https://github.com/apache/ozone/pull/2230#discussion_r631619226



##########
File path: hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancer.java
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.container.balancer;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.hdds.scm.container.ContainerManagerV2;
+import org.apache.hadoop.hdds.scm.container.MockNodeManager;
+import org.apache.hadoop.hdds.scm.container.ReplicationManager;
+import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
+import org.apache.hadoop.hdds.scm.node.DatanodeUsageInfo;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+public class TestContainerBalancer {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestContainerBalancer.class);
+
+  private ReplicationManager replicationManager;
+  private ContainerManagerV2 containerManager;
+  private ContainerBalancer containerBalancer;
+  private MockNodeManager mockNodeManager;
+  private OzoneConfiguration conf;
+  private ContainerBalancerConfiguration balancerConfiguration;
+  private List<DatanodeUsageInfo> nodesInCluster;
+  private List<Double> nodeUtilizations;
+  private double averageUtilization;
+  private int numberOfNodes;
+
+  /**
+   * Sets up configuration values and creates a mock cluster.
+   */
+  @Before
+  public void setup() {
+    conf = new OzoneConfiguration();
+    containerManager = Mockito.mock(ContainerManagerV2.class);
+    replicationManager = Mockito.mock(ReplicationManager.class);
+
+    balancerConfiguration = new ContainerBalancerConfiguration();
+    balancerConfiguration.setThreshold("0.1");
+    balancerConfiguration.setMaxDatanodesToBalance(10);
+    balancerConfiguration.setMaxSizeToMove(500L);
+    conf.setFromObject(balancerConfiguration);
+
+    this.numberOfNodes = 10;
+    generateUtilizations(numberOfNodes);
+
+    // create datanodes with the generated nodeUtilization values
+    this.averageUtilization = createNodesInCluster();
+    mockNodeManager = new MockNodeManager(nodesInCluster);
+    containerBalancer = new ContainerBalancer(mockNodeManager, containerManager,
+        replicationManager, conf);
+  }
+
+  /**
+   * Checks whether ContainerBalancer is correctly updating the list of source
+   * nodes with varying values of Threshold.
+   */
+  @Test
+  public void initializeIterationShouldUpdateSourceNodesWhenThresholdChanges() {
+    List<DatanodeUsageInfo> expectedSourceNodes;
+    List<DatanodeUsageInfo> sourceNodesAccordingToBalancer;
+
+    // check for random threshold values
+    for (int i = 0; i < 50; i++) {
+      double randomThreshold = Math.random();
+
+      balancerConfiguration.setThreshold(String.valueOf(randomThreshold));
+      containerBalancer.start(balancerConfiguration);
+      expectedSourceNodes = determineExpectedSourceNodes(randomThreshold);
+      sourceNodesAccordingToBalancer = containerBalancer.getSourceNodes();
+
+      Assert.assertEquals(
+          expectedSourceNodes.size(), sourceNodesAccordingToBalancer.size());
+
+      for (int j = 0; j < expectedSourceNodes.size(); j++) {
+        Assert.assertEquals(expectedSourceNodes.get(j).getDatanodeDetails(),
+            sourceNodesAccordingToBalancer.get(j).getDatanodeDetails());
+      }
+      containerBalancer.stop();
+    }
+
+  }
+
+  /**
+   * Checks whether the list of source is empty when the cluster is balanced.
+   */
+  @Test
+  public void sourceNodesListShouldBeEmptyWhenClusterIsBalanced() {
+    balancerConfiguration.setThreshold("0.99");
+    containerBalancer.start(balancerConfiguration);
+
+    Assert.assertEquals(0, containerBalancer.getSourceNodes().size());
+    containerBalancer.stop();
+  }
+
+  /**
+   * Checks whether ContainerBalancer stops when the limit of
+   * MaxDatanodesToBalance is reached.
+   */
+  @Test
+  public void containerBalancerShouldStopWhenMaxDatanodesToBalanceIsReached() {
+    balancerConfiguration.setMaxDatanodesToBalance(2);
+    balancerConfiguration.setThreshold("0");
+    containerBalancer.start(balancerConfiguration);
+
+    Assert.assertFalse(containerBalancer.isBalancerRunning());
+    containerBalancer.stop();
+  }
+
+  /**
+   * Determines source nodes, that is, over and under utilized nodes,
+   * according to the generated utilization values for nodes and the threshold.
+   *
+   * @param threshold A fraction from range 0 to 1.
+   * @return List of DatanodeUsageInfo containing the expected(correct)
+   * source nodes.
+   */
+  private List<DatanodeUsageInfo> determineExpectedSourceNodes(
+      double threshold) {
+    double lowerLimit = averageUtilization - threshold;
+    double upperLimit = averageUtilization + threshold;
+
+    // use node utilizations to determine over and under utilized nodes
+    List<DatanodeUsageInfo> expectedSourceNodes = new ArrayList<>();
+    for (int i = 0; i < numberOfNodes; i++) {
+      if (nodeUtilizations.get(numberOfNodes - i - 1) > upperLimit) {

Review comment:
       By using a separate method, varying thresholds can be passed as argument to determine the expected list for each threshold.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] lokeshj1703 commented on a change in pull request #2230: HDDS-4927. Add support for initializing an iteration in ContainerBalancer. Add unit tests.

Posted by GitBox <gi...@apache.org>.
lokeshj1703 commented on a change in pull request #2230:
URL: https://github.com/apache/ozone/pull/2230#discussion_r639533534



##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
##########
@@ -40,96 +44,347 @@
   private ContainerManagerV2 containerManager;
   private ReplicationManager replicationManager;
   private OzoneConfiguration ozoneConfiguration;
+  private final SCMContext scmContext;
   private double threshold;
   private int maxDatanodesToBalance;
   private long maxSizeToMove;
-  private boolean balancerRunning;
-  private List<DatanodeUsageInfo> sourceNodes;
-  private List<DatanodeUsageInfo> targetNodes;
+  private List<DatanodeUsageInfo> unBalancedNodes;
+  private List<DatanodeUsageInfo> overUtilizedNodes;
+  private List<DatanodeUsageInfo> underUtilizedNodes;
+  private List<DatanodeUsageInfo> withinThresholdUtilizedNodes;
   private ContainerBalancerConfiguration config;
+  private ContainerBalancerMetrics metrics;
+  private long clusterCapacity;
+  private long clusterUsed;
+  private long clusterRemaining;
+  private double clusterAvgUtilisation;
+  private final AtomicBoolean balancerRunning = new AtomicBoolean(false);
 
+  /**
+   * Constructs ContainerBalancer with the specified arguments. Initializes
+   * new ContainerBalancerConfiguration and ContainerBalancerMetrics.
+   * Container Balancer does not start on construction.
+   *
+   * @param nodeManager NodeManager
+   * @param containerManager ContainerManager
+   * @param replicationManager ReplicationManager
+   * @param ozoneConfiguration OzoneConfiguration
+   */
   public ContainerBalancer(
       NodeManager nodeManager,
       ContainerManagerV2 containerManager,
       ReplicationManager replicationManager,
-      OzoneConfiguration ozoneConfiguration) {
+      OzoneConfiguration ozoneConfiguration,
+      final SCMContext scmContext) {
     this.nodeManager = nodeManager;
     this.containerManager = containerManager;
     this.replicationManager = replicationManager;
     this.ozoneConfiguration = ozoneConfiguration;
-    this.balancerRunning = false;
     this.config = new ContainerBalancerConfiguration();
+    this.metrics = new ContainerBalancerMetrics();
+    this.scmContext = scmContext;
+
+    this.clusterCapacity = 0L;
+    this.clusterUsed = 0L;
+    this.clusterRemaining = 0L;
+
+    this.overUtilizedNodes = new ArrayList<>();
+    this.underUtilizedNodes = new ArrayList<>();
+    this.unBalancedNodes = new ArrayList<>();
+    this.withinThresholdUtilizedNodes = new ArrayList<>();
   }
 
   /**
-   * Start ContainerBalancer. Current implementation is incomplete.
+   * Starts ContainerBalancer. Current implementation is incomplete.
    *
    * @param balancerConfiguration Configuration values.
    */
-  public void start(ContainerBalancerConfiguration balancerConfiguration) {
-    this.balancerRunning = true;
+  public boolean start(ContainerBalancerConfiguration balancerConfiguration) {

Review comment:
       @JacksonYao287 if there are other suggestions for this function we can take them up in #2278.  If it is ok I will commit this PR.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] siddhantsangwan closed pull request #2230: HDDS-4927. Add support for initializing an iteration in ContainerBalancer. Add unit tests.

Posted by GitBox <gi...@apache.org>.
siddhantsangwan closed pull request #2230:
URL: https://github.com/apache/ozone/pull/2230


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] GlenGeng commented on a change in pull request #2230: HDDS-4927. Add support for initializing an iteration in ContainerBalancer. Add unit tests.

Posted by GitBox <gi...@apache.org>.
GlenGeng commented on a change in pull request #2230:
URL: https://github.com/apache/ozone/pull/2230#discussion_r639542307



##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
##########
@@ -40,96 +44,347 @@
   private ContainerManagerV2 containerManager;
   private ReplicationManager replicationManager;
   private OzoneConfiguration ozoneConfiguration;
+  private final SCMContext scmContext;
   private double threshold;
   private int maxDatanodesToBalance;
   private long maxSizeToMove;
-  private boolean balancerRunning;
-  private List<DatanodeUsageInfo> sourceNodes;
-  private List<DatanodeUsageInfo> targetNodes;
+  private List<DatanodeUsageInfo> unBalancedNodes;
+  private List<DatanodeUsageInfo> overUtilizedNodes;
+  private List<DatanodeUsageInfo> underUtilizedNodes;
+  private List<DatanodeUsageInfo> withinThresholdUtilizedNodes;
   private ContainerBalancerConfiguration config;
+  private ContainerBalancerMetrics metrics;
+  private long clusterCapacity;
+  private long clusterUsed;
+  private long clusterRemaining;
+  private double clusterAvgUtilisation;
+  private final AtomicBoolean balancerRunning = new AtomicBoolean(false);
 
+  /**
+   * Constructs ContainerBalancer with the specified arguments. Initializes
+   * new ContainerBalancerConfiguration and ContainerBalancerMetrics.
+   * Container Balancer does not start on construction.
+   *
+   * @param nodeManager NodeManager
+   * @param containerManager ContainerManager
+   * @param replicationManager ReplicationManager
+   * @param ozoneConfiguration OzoneConfiguration
+   */
   public ContainerBalancer(
       NodeManager nodeManager,
       ContainerManagerV2 containerManager,
       ReplicationManager replicationManager,
-      OzoneConfiguration ozoneConfiguration) {
+      OzoneConfiguration ozoneConfiguration,
+      final SCMContext scmContext) {
     this.nodeManager = nodeManager;
     this.containerManager = containerManager;
     this.replicationManager = replicationManager;
     this.ozoneConfiguration = ozoneConfiguration;
-    this.balancerRunning = false;
     this.config = new ContainerBalancerConfiguration();
+    this.metrics = new ContainerBalancerMetrics();
+    this.scmContext = scmContext;
+
+    this.clusterCapacity = 0L;
+    this.clusterUsed = 0L;
+    this.clusterRemaining = 0L;
+
+    this.overUtilizedNodes = new ArrayList<>();
+    this.underUtilizedNodes = new ArrayList<>();
+    this.unBalancedNodes = new ArrayList<>();
+    this.withinThresholdUtilizedNodes = new ArrayList<>();
   }
 
   /**
-   * Start ContainerBalancer. Current implementation is incomplete.
+   * Starts ContainerBalancer. Current implementation is incomplete.
    *
    * @param balancerConfiguration Configuration values.
    */
-  public void start(ContainerBalancerConfiguration balancerConfiguration) {
-    this.balancerRunning = true;
+  public boolean start(ContainerBalancerConfiguration balancerConfiguration) {

Review comment:
       According to our design decision, params will be based from command line, which will be implemented by https://github.com/apache/ozone/pull/2278, thereby the conf related code will be removed in future.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] lokeshj1703 commented on a change in pull request #2230: HDDS-4927. Add support for initializing an iteration in ContainerBalancer. Add unit tests.

Posted by GitBox <gi...@apache.org>.
lokeshj1703 commented on a change in pull request #2230:
URL: https://github.com/apache/ozone/pull/2230#discussion_r635049406



##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
##########
@@ -205,11 +202,10 @@ private boolean initializeIteration() {
         numDatanodesBalanced += 1;
       }
     }
-
-    // calculate total number of nodes that have been balanced
+    // calculate total number of nodes that have been balanced so far
     numDatanodesBalanced =
-        numDatanodesBalanced + metrics.getNumDatanodesBalanced().get();
-    metrics.setNumDatanodesBalanced(new LongMetric(numDatanodesBalanced));
+        metrics.addToNumDatanodesBalanced(numDatanodesBalanced);

Review comment:
       NIT: Change fn name to `incroNumDatanodesBalanced`

##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
##########
@@ -40,96 +44,341 @@
   private ContainerManagerV2 containerManager;
   private ReplicationManager replicationManager;
   private OzoneConfiguration ozoneConfiguration;
+  private final SCMContext scmContext;
   private double threshold;
   private int maxDatanodesToBalance;
   private long maxSizeToMove;
-  private boolean balancerRunning;
-  private List<DatanodeUsageInfo> sourceNodes;
-  private List<DatanodeUsageInfo> targetNodes;
+  private List<DatanodeUsageInfo> unBalancedNodes;
+  private List<DatanodeUsageInfo> overUtilizedNodes;
+  private List<DatanodeUsageInfo> underUtilizedNodes;
+  private List<DatanodeUsageInfo> withinThresholdUtilizedNodes;
   private ContainerBalancerConfiguration config;
+  private ContainerBalancerMetrics metrics;
+  private long clusterCapacity;
+  private long clusterUsed;
+  private long clusterRemaining;
+  private double clusterAvgUtilisation;
+  private final AtomicBoolean balancerRunning = new AtomicBoolean(false);
 
+  /**
+   * Constructs ContainerBalancer with the specified arguments. Initializes
+   * new ContainerBalancerConfiguration and ContainerBalancerMetrics.
+   * Container Balancer does not start on construction.
+   *
+   * @param nodeManager NodeManager
+   * @param containerManager ContainerManager
+   * @param replicationManager ReplicationManager
+   * @param ozoneConfiguration OzoneConfiguration
+   */
   public ContainerBalancer(
       NodeManager nodeManager,
       ContainerManagerV2 containerManager,
       ReplicationManager replicationManager,
-      OzoneConfiguration ozoneConfiguration) {
+      OzoneConfiguration ozoneConfiguration,
+      final SCMContext scmContext) {
     this.nodeManager = nodeManager;
     this.containerManager = containerManager;
     this.replicationManager = replicationManager;
     this.ozoneConfiguration = ozoneConfiguration;
-    this.balancerRunning = false;
     this.config = new ContainerBalancerConfiguration();
+    this.metrics = new ContainerBalancerMetrics();
+    this.scmContext = scmContext;
   }
 
   /**
-   * Start ContainerBalancer. Current implementation is incomplete.
+   * Starts ContainerBalancer. Current implementation is incomplete.
    *
    * @param balancerConfiguration Configuration values.
    */
-  public void start(ContainerBalancerConfiguration balancerConfiguration) {
-    this.balancerRunning = true;
-
+  public boolean start(ContainerBalancerConfiguration balancerConfiguration) {
+    if (!balancerRunning.compareAndSet(false, true)) {
+      LOG.error("Container Balancer is already running.");
+      return false;
+    }
+    if (scmContext.isInSafeMode()) {
+      LOG.error("Container Balancer cannot operate while SCM is in Safe Mode.");
+      return false;
+    }

Review comment:
       This should be moved in initialise function. So the balancer skips balancing in case SCM is in safe mode.

##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerConfiguration.java
##########
@@ -23,80 +23,95 @@
 import org.apache.hadoop.hdds.conf.ConfigGroup;
 import org.apache.hadoop.hdds.conf.ConfigTag;
 import org.apache.hadoop.hdds.conf.ConfigType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * This class contains configuration values for the ContainerBalancer.
  */
 @ConfigGroup(prefix = "hdds.container.balancer.")
 public final class ContainerBalancerConfiguration {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ContainerBalancerConfiguration.class);
+
   @Config(key = "utilization.threshold", type = ConfigType.AUTO, defaultValue =
       "0.1", tags = {ConfigTag.BALANCER},
       description = "Threshold is a fraction in the range of 0 to 1. A " +
           "cluster is considered balanced if for each datanode, the " +
           "utilization of the datanode (used space to capacity ratio) differs" +
           " from the utilization of the cluster (used space to capacity ratio" +
           " of the entire cluster) no more than the threshold value.")
-  private double threshold = 0.1;
+  private String threshold = "0.1";
 
   @Config(key = "datanodes.balanced.max", type = ConfigType.INT,
       defaultValue = "5", tags = {ConfigTag.BALANCER}, description = "The " +
       "maximum number of datanodes that should be balanced. Container " +
       "Balancer will not balance more number of datanodes than this limit.")
   private int maxDatanodesToBalance = 5;
 
-  @Config(key = "size.moved.max", type = ConfigType.LONG,
-      defaultValue = "10737418240L", tags = {ConfigTag.BALANCER},
-      description = "The maximum size of data in Bytes that will be moved " +
-          "by the Container Balancer.")
-  private long maxSizeToMove = 10737418240L;
+  @Config(key = "size.moved.max", type = ConfigType.SIZE,
+      defaultValue = "10", tags = {ConfigTag.BALANCER},

Review comment:
       Please see how ConfigType.SIZE is used in other places. I think the default value should be `10GB`. It is more flexible config type and it will not be limited to GB alone. The description and javadoc for corresponding fns can be updated to remove specificity to GB.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] linyiqun commented on a change in pull request #2230: HDDS-4927. Add support for initializing an iteration in ContainerBalancer. Add unit tests.

Posted by GitBox <gi...@apache.org>.
linyiqun commented on a change in pull request #2230:
URL: https://github.com/apache/ozone/pull/2230#discussion_r629879887



##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
##########
@@ -59,74 +80,305 @@ public ContainerBalancer(
     this.ozoneConfiguration = ozoneConfiguration;
     this.balancerRunning = false;
     this.config = new ContainerBalancerConfiguration();
+    this.metrics = new ContainerBalancerMetrics();
   }
 
   /**
-   * Start ContainerBalancer. Current implementation is incomplete.
+   * Starts ContainerBalancer. Current implementation is incomplete.
    *
    * @param balancerConfiguration Configuration values.
    */
   public void start(ContainerBalancerConfiguration balancerConfiguration) {
+    if (balancerRunning) {
+      LOG.info("Container Balancer is already running.");
+      throw new RuntimeException();
+    }
     this.balancerRunning = true;
-
     ozoneConfiguration = new OzoneConfiguration();
 
-    // initialise configs
     this.config = balancerConfiguration;
     this.threshold = config.getThreshold();
-    this.maxDatanodesToBalance =
-        config.getMaxDatanodesToBalance();
+    this.maxDatanodesToBalance = config.getMaxDatanodesToBalance();
     this.maxSizeToMove = config.getMaxSizeToMove();
 
+    this.clusterCapacity = 0L;
+    this.clusterUsed = 0L;
+    this.clusterRemaining = 0L;
+
+    this.overUtilizedNodes = new ArrayList<>();
+    this.underUtilizedNodes = new ArrayList<>();
+    this.aboveAverageUtilizedNodes = new ArrayList<>();
+    this.belowAverageUtilizedNodes = new ArrayList<>();
+    this.sourceNodes = new ArrayList<>();
+
     LOG.info("Starting Container Balancer...");
+    LOG.info(toString());
+
+    balance();
+  }
 
-    // sorted list in order from most to least used
-    List<DatanodeUsageInfo> nodes = nodeManager.
-        getMostOrLeastUsedDatanodes(true);
-    double avgUtilisation = calculateAvgUtilisation(nodes);
+  /**
+   * Balances the cluster.
+   */
+  private void balance() {
+    boolean initialized = initializeIteration();
+  }
+
+  /**
+   * Initializes an iteration during balancing. Recognizes over, under,
+   * below-average,and under-average utilizes nodes. Decides whether
+   * balancing needs to continue or should be stopped.
+   */
+  private boolean initializeIteration() {
+    List<DatanodeUsageInfo> nodes;
+    try {
+      // sorted list in order from most to least used
+      nodes = nodeManager.getMostOrLeastUsedDatanodes(true);
+    } catch (NullPointerException e) {
+      LOG.error("Container Balancer could not retrieve nodes from Node " +
+          "Manager.", e);
+      stop();
+      return false;
+    }
+
+    clusterAvgUtilisation = calculateAvgUtilization(nodes);
+    LOG.info("Average utilization of the cluster is {}", clusterAvgUtilisation);
 
     // under utilized nodes have utilization(that is, used / capacity) less
     // than lower limit
-    double lowerLimit = avgUtilisation - threshold;
+    double lowerLimit = clusterAvgUtilisation - threshold;
 
     // over utilized nodes have utilization(that is, used / capacity) greater
     // than upper limit
-    double upperLimit = avgUtilisation + threshold;
+    double upperLimit = clusterAvgUtilisation + threshold;
+
     LOG.info("Lower limit for utilization is {}", lowerLimit);
     LOG.info("Upper limit for utilization is {}", upperLimit);
 
-    // find over utilised(source) and under utilised(target) nodes
-    sourceNodes = new ArrayList<>();
-    targetNodes = new ArrayList<>();
-//    for (DatanodeUsageInfo node : nodes) {
-//      SCMNodeStat stat = node.getScmNodeStat();
-//      double utilization = stat.getScmUsed().get().doubleValue() /
-//          stat.getCapacity().get().doubleValue();
-//      if (utilization > upperLimit) {
-//        sourceNodes.add(node);
-//      } else if (utilization < lowerLimit || utilization < avgUtilisation) {
-//        targetNodes.add(node);
-//      }
-//    }
-  }
-
-  // calculate the average datanode utilisation across the cluster
-  private double calculateAvgUtilisation(List<DatanodeUsageInfo> nodes) {
+    long numDatanodesToBalance = 0L;
+    double overLoadedBytes = 0D, underLoadedBytes = 0D;
+
+    // find over and under utilized nodes
+    for (DatanodeUsageInfo node : nodes) {
+      double utilization = calculateUtilization(node);
+      if (utilization > clusterAvgUtilisation) {
+        if (utilization > upperLimit) {
+          overUtilizedNodes.add(node);
+          numDatanodesToBalance += 1;
+
+          // amount of bytes greater than upper limit in this node
+          overLoadedBytes +=
+              ratioToBytes(node.getScmNodeStat().getCapacity().get(),
+                  utilization) -
+                  ratioToBytes(node.getScmNodeStat().getCapacity().get(),
+                      upperLimit);
+        } else {
+          aboveAverageUtilizedNodes.add(node);
+        }
+      } else if (utilization < clusterAvgUtilisation) {
+        if (utilization < lowerLimit) {
+          underUtilizedNodes.add(node);
+          numDatanodesToBalance += 1;
+
+          // amount of bytes lesser than lower limit in this node
+          underLoadedBytes +=
+              ratioToBytes(node.getScmNodeStat().getCapacity().get(),
+                  lowerLimit) -
+                  ratioToBytes(node.getScmNodeStat().getCapacity().get(),
+                      utilization);
+        } else {
+          belowAverageUtilizedNodes.add(node);
+        }
+      }
+    }
+
+    Collections.reverse(underUtilizedNodes);
+    Collections.reverse(belowAverageUtilizedNodes);
+
+    long numDatanodesBalanced = 0;
+    // count number of nodes that were balanced in previous iteration
+    for (DatanodeUsageInfo node : sourceNodes) {
+      if (!containsNode(overUtilizedNodes, node) &&
+          !containsNode(underUtilizedNodes, node)) {
+        numDatanodesBalanced += 1;
+      }
+    }
+    metrics.setNumDatanodesBalanced(new LongMetric(numDatanodesBalanced));
+    sourceNodes = new ArrayList<>(
+        overUtilizedNodes.size() + underUtilizedNodes.size());
+
+    if (numDatanodesBalanced + numDatanodesToBalance > maxDatanodesToBalance) {
+      LOG.info("Approaching Max Datanodes To Balance limit in Container " +
+          "Balancer. Stopping Balancer.");
+      stop();
+      return false;
+    } else {
+      sourceNodes.addAll(overUtilizedNodes);
+      sourceNodes.addAll(underUtilizedNodes);

Review comment:
       >So here the term 'source nodes' has been used for nodes that need to be balanced. 
   
   Okay, so the meaning of source node is a little different with before.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] lokeshj1703 closed pull request #2230: HDDS-4927. Determine over and under utilized datanodes in Container Balancer.

Posted by GitBox <gi...@apache.org>.
lokeshj1703 closed pull request #2230:
URL: https://github.com/apache/ozone/pull/2230


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] lokeshj1703 commented on pull request #2230: HDDS-4927. Add support for initializing an iteration in ContainerBalancer. Add unit tests.

Posted by GitBox <gi...@apache.org>.
lokeshj1703 commented on pull request #2230:
URL: https://github.com/apache/ozone/pull/2230#issuecomment-838094543


   @siddhantsangwan The PR shows commits from HDDS-4925 as well. Can you please take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org