You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@ozone.apache.org by GitBox <gi...@apache.org> on 2021/07/27 07:47:24 UTC

[GitHub] [ozone] lokeshj1703 commented on a change in pull request #2441: HDDS-4929. Select target datanodes and containers to move for Container Balancer

lokeshj1703 commented on a change in pull request #2441:
URL: https://github.com/apache/ozone/pull/2441#discussion_r676495944



##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
##########
@@ -113,49 +130,50 @@ public boolean start(
     lock.lock();
     try {
       if (!balancerRunning.compareAndSet(false, true)) {
-        LOG.info("Container Balancer is already running.");
+        LOG.error("Container Balancer is already running.");
         return false;
       }
-      
+
+      ozoneConfiguration = new OzoneConfiguration();
       this.config = balancerConfiguration;
-      this.idleIteration = config.getIdleIteration();
-      this.threshold = config.getThreshold();
-      this.maxDatanodesToBalance = config.getMaxDatanodesToBalance();
-      this.maxSizeToMoveInGB = config.getMaxSizeToMove();
-      this.unBalancedNodes = new ArrayList<>();
       LOG.info("Starting Container Balancer...{}", this);
+
       //we should start a new balancer thread async
       //and response to cli as soon as possible
 
 
       //TODO: this is a temporary implementation
       //modify this later
-      currentBalancingThread = new Thread(() -> balance());
+      currentBalancingThread = new Thread(this::balance);
       currentBalancingThread.start();
       ////////////////////////
     } finally {
       lock.unlock();
     }
 
-
     return true;
   }
 
   /**
    * Balances the cluster.
    */
   private void balance() {
-    for (int i = 0; i < idleIteration; i++) {
-      if (!initializeIteration()) {
+    countDatanodesBalanced = 0;
+    totalSizeMoved = 0;
+    int i = 0;
+    do {
+      this.idleIteration = config.getIdleIteration();
+      this.threshold = config.getThreshold();
+      this.maxDatanodesToBalance = config.getMaxDatanodesToBalance();
+      this.maxSizeToMove = config.getMaxSizeToMove();
+
+      if (!initializeIteration() || !doIteration()) {
         //balancer should be stopped immediately
         break;
       }
-      // unBalancedNodes is not cleared since the next iteration uses this
-      // iteration's unBalancedNodes to find out how many nodes were balanced
-      overUtilizedNodes.clear();
-      underUtilizedNodes.clear();
-      withinThresholdUtilizedNodes.clear();
-    }
+      i++;
+    } while (i < idleIteration);

Review comment:
       We should have a configurable sleep period between iterations. We should also check that sleep period is greater than du refresh period for datanodes.

##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
##########
@@ -84,25 +104,22 @@ public ContainerBalancer(
       ContainerManagerV2 containerManager,
       ReplicationManager replicationManager,
       OzoneConfiguration ozoneConfiguration,
-      final SCMContext scmContext) {
+      final SCMContext scmContext,
+      PlacementPolicy placementPolicy) {
     this.nodeManager = nodeManager;
     this.containerManager = containerManager;
     this.replicationManager = replicationManager;
     this.ozoneConfiguration = ozoneConfiguration;
     this.config = new ContainerBalancerConfiguration();
     this.metrics = new ContainerBalancerMetrics();
     this.scmContext = scmContext;

Review comment:
       It might make sense to have an Iteration class to store iteration related fields. ContainerBalancer should ideally store fields used across iterations.

##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
##########
@@ -200,6 +228,8 @@ private boolean initializeIteration() {
     // find over and under utilized nodes
     for (DatanodeUsageInfo datanodeUsageInfo : datanodeUsageInfos) {
       double utilization = calculateUtilization(datanodeUsageInfo);
+      LOG.info("Utilization for node {} is {}",
+          datanodeUsageInfo.getDatanodeDetails().getUuidString(), utilization);

Review comment:
       This can be a debug log. Similarly there are other statements which can be made into debug log.
   Further it can be enclosed under `LOG.isDebugEnabled` check.

##########
File path: hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancer.java
##########
@@ -141,18 +200,231 @@ public void unBalancedNodesListShouldBeEmptyWhenClusterIsBalanced() {
    */
   @Test
   public void containerBalancerShouldStopWhenMaxDatanodesToBalanceIsReached() {
-    balancerConfiguration.setMaxDatanodesToBalance(2);
+    balancerConfiguration.setMaxDatanodesToBalance(4);
+    balancerConfiguration.setThreshold(0.01);
+    containerBalancer.start(balancerConfiguration);
+
+    // waiting for balance completed.
+    // TODO: this is a temporary implementation for now
+    // modify this after balancer is fully completed
+    try {
+      Thread.sleep(1000);
+    } catch (InterruptedException e) {}
+
+    Assert.assertFalse(containerBalancer.isBalancerRunning());
+    containerBalancer.stop();
+  }
+
+  @Test
+  public void containerBalancerShouldSelectOnlyClosedContainers() {
+    // make all containers open, balancer should not select any of them
+    for (ContainerInfo containerInfo : cidToInfoMap.values()) {
+      containerInfo.setState(HddsProtos.LifeCycleState.OPEN);
+    }
     balancerConfiguration.setThreshold(0.1);
     containerBalancer.start(balancerConfiguration);
 
     // waiting for balance completed.
     // TODO: this is a temporary implementation for now
     // modify this after balancer is fully completed
     try {
-      Thread.sleep(3000);
+      Thread.sleep(1000);
+    } catch (InterruptedException e) {}
+
+    containerBalancer.stop();
+
+    // balancer should have identified unbalanced nodes
+    Assert.assertFalse(containerBalancer.getUnBalancedNodes().isEmpty());
+    // no container should have been selected
+    Assert.assertTrue(containerBalancer.getSourceToTargetMap().isEmpty());
+
+    // now, close all containers
+    for (ContainerInfo containerInfo : cidToInfoMap.values()) {
+      containerInfo.setState(HddsProtos.LifeCycleState.CLOSED);
+    }
+    containerBalancer.start(balancerConfiguration);
+
+    // waiting for balance completed.
+    // TODO: this is a temporary implementation for now
+    // modify this after balancer is fully completed
+    try {
+      Thread.sleep(1000);
+    } catch (InterruptedException e) {}
+
+    containerBalancer.stop();
+    // check whether all selected containers are closed
+    for (ContainerMoveSelection moveSelection:
+         containerBalancer.getSourceToTargetMap().values()) {
+      Assert.assertSame(
+          cidToInfoMap.get(moveSelection.getContainerID()).getState(),
+          HddsProtos.LifeCycleState.CLOSED);
+    }
+  }
+
+  @Test
+  public void containerBalancerShouldStopWhenMaxSizeToMoveIsReached() {
+    balancerConfiguration.setThreshold(0.01);
+    balancerConfiguration.setMaxSizeToMove(10 * OzoneConsts.GB);
+    containerBalancer.start(balancerConfiguration);
+
+    // waiting for balance completed.
+    // TODO: this is a temporary implementation for now
+    // modify this after balancer is fully completed
+    try {
+      Thread.sleep(1000);
     } catch (InterruptedException e) {}
 
     Assert.assertFalse(containerBalancer.isBalancerRunning());
+    containerBalancer.stop();

Review comment:
       Can we add an assertion respect to MaxSize?

##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerConfiguration.java
##########
@@ -44,23 +49,39 @@
           " of the entire cluster) no more than the threshold value.")
   private String threshold = "0.1";
 
-  @Config(key = "datanodes.balanced.max", type = ConfigType.INT,
+  @Config(key = "datanodes.balanced.max.per.iteration", type = ConfigType.INT,

Review comment:
       It might be better to have it as a percentage of total datanodes in the cluster.

##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
##########
@@ -226,78 +256,188 @@ private boolean initializeIteration() {
     }
     Collections.reverse(underUtilizedNodes);
 
-    long countDatanodesBalanced = 0;
-    // count number of nodes that were balanced in previous iteration
-    for (DatanodeUsageInfo node : unBalancedNodes) {
-      if (!containsNode(overUtilizedNodes, node) &&
-          !containsNode(underUtilizedNodes, node)) {
-        countDatanodesBalanced += 1;
-      }
-    }
-    // calculate total number of nodes that have been balanced so far
-    countDatanodesBalanced =
-        metrics.incrementDatanodesNumBalanced(countDatanodesBalanced);
-
     unBalancedNodes = new ArrayList<>(
         overUtilizedNodes.size() + underUtilizedNodes.size());
+    unBalancedNodes.addAll(overUtilizedNodes);
+    unBalancedNodes.addAll(underUtilizedNodes);
 
-    if (countDatanodesBalanced + countDatanodesToBalance >
-        maxDatanodesToBalance) {
-      LOG.info("Approaching Max Datanodes To Balance limit in Container " +
-          "Balancer. Stopping Balancer.");
+    if (unBalancedNodes.isEmpty()) {
+      LOG.info("Did not find any unbalanced Datanodes.");
       return false;
-    } else {
-      unBalancedNodes.addAll(overUtilizedNodes);
-      unBalancedNodes.addAll(underUtilizedNodes);
-
-      //for now, we just sleep to simulate the execution of balancer
-      //this if for acceptance test now. modify this later when balancer
-      //if fully completed
-      try {
-        Thread.sleep(50);
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-      }
-      /////////////////////////////
+    }
+
+    LOG.info("Container Balancer has identified Datanodes that need to be" +
+        " balanced.");
+
+    selectionCriteria = new ContainerBalancerSelectionCriteria(config,
+        nodeManager, replicationManager, containerManager);
+    sourceToTargetMap = new HashMap<>(overUtilizedNodes.size() +
+        withinThresholdUtilizedNodes.size());
+
+    // initialize maps to track how much size is leaving and entering datanodes
+    sizeLeavingNode = new HashMap<>(overUtilizedNodes.size() +
+        withinThresholdUtilizedNodes.size());
+    overUtilizedNodes.forEach(datanodeUsageInfo -> sizeLeavingNode
+        .put(datanodeUsageInfo.getDatanodeDetails(), 0L));
+    withinThresholdUtilizedNodes.forEach(datanodeUsageInfo -> sizeLeavingNode
+        .put(datanodeUsageInfo.getDatanodeDetails(), 0L));
+
+    sizeEnteringNode = new HashMap<>(underUtilizedNodes.size() +
+        withinThresholdUtilizedNodes.size());
+    underUtilizedNodes.forEach(datanodeUsageInfo -> sizeEnteringNode
+        .put(datanodeUsageInfo.getDatanodeDetails(), 0L));
+    withinThresholdUtilizedNodes.forEach(datanodeUsageInfo -> sizeEnteringNode
+        .put(datanodeUsageInfo.getDatanodeDetails(), 0L));
 
-      if (unBalancedNodes.isEmpty()) {
-        LOG.info("Did not find any unbalanced Datanodes.");
+    return true;
+  }
+
+  private boolean doIteration() {
+    List<DatanodeDetails> potentialTargets = getPotentialTargets();
+    Set<DatanodeDetails> selectedTargets =
+        new HashSet<>(potentialTargets.size());
+
+    // match each overUtilized node with a target
+    for (DatanodeUsageInfo datanodeUsageInfo : overUtilizedNodes) {
+      DatanodeDetails source = datanodeUsageInfo.getDatanodeDetails();
+      if (!checkConditionsForBalancing()) {
+        LOG.info("Conditions for balancing failed. Stopping Container " +
+            "Balancer...");
         return false;
-      } else {
-        LOG.info("Container Balancer has identified Datanodes that need to be" +
-            " balanced.");
+      }
+
+      ContainerMoveSelection moveSelection =
+          matchSourceWithTarget(source, potentialTargets);
+
+      if (moveSelection != null) {
+        LOG.info("ContainerBalancer is trying to move container {} from " +
+                "source datanode {} to target datanode {}",
+            moveSelection.getContainerID().toString(), source.getUuidString(),
+            moveSelection.getTargetNode().getUuidString());
+
+        // move container
+        // if move successful, do the following
+        potentialTargets = updateTargetsAndSelectionCriteria(potentialTargets,
+            selectedTargets, moveSelection, source);
+      }
+    }
+
+    // if not all underUtilized nodes have been selected, try to match
+    // withinThresholdUtilized nodes with underUtilized nodes
+    if (selectedTargets.size() < underUtilizedNodes.size()) {
+      potentialTargets.removeAll(selectedTargets);
+      Collections.reverse(withinThresholdUtilizedNodes);
+
+      for (DatanodeUsageInfo datanodeUsageInfo : withinThresholdUtilizedNodes) {
+        DatanodeDetails source = datanodeUsageInfo.getDatanodeDetails();
+        if (!checkConditionsForBalancing()) {
+          LOG.info("Conditions for balancing failed. Stopping Container " +
+              "Balancer...");
+          return false;
+        }
+
+        ContainerMoveSelection moveSelection = matchSourceWithTarget(source,
+            potentialTargets);
+
+        if (moveSelection != null) {
+          LOG.info("ContainerBalancer is trying to move container {} from " +
+                  "source datanode {} to target datanode {}",
+              moveSelection.getContainerID().toString(),
+              source.getUuidString(),
+              moveSelection.getTargetNode().getUuidString());
+
+          // move container
+          // if move successful, do the following
+          potentialTargets = updateTargetsAndSelectionCriteria(potentialTargets,
+              selectedTargets, moveSelection, source);
+        }
       }
     }
     return true;
   }
 
   /**
-   * Performs binary search to determine if the specified listToSearch
-   * contains the specified node.
-   *
-   * @param listToSearch List of DatanodeUsageInfo to be searched.
-   * @param node DatanodeUsageInfo to be searched for.
-   * @return true if the specified node is present in listToSearch, otherwise
-   * false.
+   * Match a source datanode with a target datanode and identify the container
+   * to move.
+   * @param potentialTargets List of potential targets to move container to
+   * @return ContainerMoveSelection containing the selected target and container
    */
-  private boolean containsNode(
-      List<DatanodeUsageInfo> listToSearch, DatanodeUsageInfo node) {
-    int index = 0;
-    Comparator<DatanodeUsageInfo> comparator =
-        DatanodeUsageInfo.getMostUsedByRemainingRatio();
-    int size = listToSearch.size();
-    if (size == 0) {
-      return false;
+  private ContainerMoveSelection matchSourceWithTarget(DatanodeDetails source,
+                                    List<DatanodeDetails> potentialTargets) {
+
+    NavigableSet<ContainerID> candidateContainers =
+        selectionCriteria.getCandidateContainers(source);
+
+    if (candidateContainers.isEmpty()) {
+      LOG.info("ContainerBalancer could not find any candidate containers for" +
+          " datanode {}", source.getUuidString());
+      return null;
+    }
+    LOG.info("ContainerBalancer is finding suitable target for source " +
+        "datanode {}", source.getUuidString());
+    ContainerMoveSelection moveSelection =
+        findTargetStrategy.findTargetForContainerMove(
+            source, potentialTargets, candidateContainers,
+            this::canSizeEnterTarget);
+
+    if (moveSelection == null) {
+      LOG.info("ContainerBalancer could not find a suitable target for " +
+          "source node {}.", source.getUuidString());
+      return null;
     }
 
-    if (comparator.compare(listToSearch.get(0),
-        listToSearch.get(size - 1)) < 0) {
-      index =
-          Collections.binarySearch(listToSearch, node, comparator.reversed());
-    } else {
-      index = Collections.binarySearch(listToSearch, node, comparator);
+    LOG.info("ContainerBalancer matched source datanode {} with target " +
+            "datanode {} for container move.", source.getUuidString(),
+        moveSelection.getTargetNode().getUuidString());
+
+    return moveSelection;
+  }
+
+  /**
+   * Checks if limits maxDatanodesToBalance and maxSizeToMove have not been hit.
+   * @return true if conditions pass and balancing can continue, else false
+   */
+  private boolean checkConditionsForBalancing() {
+    if (countDatanodesBalanced + 2 > maxDatanodesToBalance) {
+      LOG.info("Hit max datanodes to balance limit. {} datanodes have already" +
+              " been balanced and the limit is {}.", countDatanodesBalanced,
+          maxDatanodesToBalance);
+      return false;
     }
-    return index >= 0 && listToSearch.get(index).equals(node);
+    if (totalSizeMoved + (long) ozoneConfiguration.getStorageSize(
+        ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
+        ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT,
+        StorageUnit.BYTES) > maxSizeToMove) {
+      LOG.info("Hit max size to move limit. {} bytes have already been moved " +
+          "and the limit is {} bytes.", totalSizeMoved, maxSizeToMove);
+      return false;
+    }

Review comment:
       We should also check that after move target and source are within threshold of average utilisation.

##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
##########
@@ -113,49 +130,50 @@ public boolean start(
     lock.lock();
     try {
       if (!balancerRunning.compareAndSet(false, true)) {
-        LOG.info("Container Balancer is already running.");
+        LOG.error("Container Balancer is already running.");
         return false;
       }
-      
+
+      ozoneConfiguration = new OzoneConfiguration();
       this.config = balancerConfiguration;
-      this.idleIteration = config.getIdleIteration();
-      this.threshold = config.getThreshold();
-      this.maxDatanodesToBalance = config.getMaxDatanodesToBalance();
-      this.maxSizeToMoveInGB = config.getMaxSizeToMove();
-      this.unBalancedNodes = new ArrayList<>();
       LOG.info("Starting Container Balancer...{}", this);
+
       //we should start a new balancer thread async
       //and response to cli as soon as possible
 
 
       //TODO: this is a temporary implementation
       //modify this later
-      currentBalancingThread = new Thread(() -> balance());
+      currentBalancingThread = new Thread(this::balance);
       currentBalancingThread.start();
       ////////////////////////
     } finally {
       lock.unlock();
     }
 
-
     return true;
   }
 
   /**
    * Balances the cluster.
    */
   private void balance() {
-    for (int i = 0; i < idleIteration; i++) {
-      if (!initializeIteration()) {
+    countDatanodesBalanced = 0;
+    totalSizeMoved = 0;
+    int i = 0;
+    do {
+      this.idleIteration = config.getIdleIteration();
+      this.threshold = config.getThreshold();
+      this.maxDatanodesToBalance = config.getMaxDatanodesToBalance();
+      this.maxSizeToMove = config.getMaxSizeToMove();
+
+      if (!initializeIteration() || !doIteration()) {
         //balancer should be stopped immediately
         break;
       }
-      // unBalancedNodes is not cleared since the next iteration uses this
-      // iteration's unBalancedNodes to find out how many nodes were balanced
-      overUtilizedNodes.clear();
-      underUtilizedNodes.clear();
-      withinThresholdUtilizedNodes.clear();
-    }
+      i++;
+    } while (i < idleIteration);
+
     balancerRunning.compareAndSet(true, false);

Review comment:
       Better to call stop here.

##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerSelectionCriteria.java
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.container.balancer;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerManagerV2;
+import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
+import org.apache.hadoop.hdds.scm.container.ReplicationManager;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.NavigableSet;
+import java.util.Set;
+import java.util.TreeSet;
+
+/**
+ * The selection criteria for selecting containers that will be moved and
+ * selecting datanodes that containers will move to.
+ */
+public class ContainerBalancerSelectionCriteria {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ContainerBalancerSelectionCriteria.class);
+
+  private ContainerBalancerConfiguration balancerConfiguration;
+  private NodeManager nodeManager;
+  private ReplicationManager replicationManager;
+  private ContainerManagerV2 containerManagerV2;
+  private Set<ContainerID> selectedContainers;
+  private Set<ContainerID> excludeContainers;
+
+  public ContainerBalancerSelectionCriteria(
+      ContainerBalancerConfiguration balancerConfiguration,
+      NodeManager nodeManager,
+      ReplicationManager replicationManager,
+      ContainerManagerV2 containerManagerV2) {
+    this.balancerConfiguration = balancerConfiguration;
+    this.nodeManager = nodeManager;
+    this.replicationManager = replicationManager;
+    this.containerManagerV2 = containerManagerV2;
+    selectedContainers = new HashSet<>();
+    excludeContainers = balancerConfiguration.getExcludeContainers();
+  }
+
+  /**
+   * Checks whether container is currently undergoing replication or deletion.
+   *
+   * @param containerID Container to check.
+   * @return true if container is replicating or deleting, otherwise false.
+   */
+  private boolean isContainerReplicatingOrDeleting(ContainerID containerID) {
+    return replicationManager.isContainerReplicatingOrDeleting(containerID);
+  }
+
+  /**
+   * Gets containers that are suitable for moving based on the following
+   * required criteria:
+   * 1. Container must not be undergoing replication.
+   * 2. Container must not already be selected for balancing.
+   * 3. Container size should be closer to 5GB.
+   * 4. Container must not be in the configured exclude containers list.
+   * 5. Container should be closed.
+   *
+   * @param node DatanodeDetails for which to find candidate containers.
+   * @return NavigableSet of candidate containers that satisfy the criteria.
+   */
+  public NavigableSet<ContainerID> getCandidateContainers(
+      DatanodeDetails node) {
+    NavigableSet<ContainerID> containerIDSet =
+        new TreeSet<>(orderContainersByUsedBytes());
+    try {
+      containerIDSet.addAll(nodeManager.getContainers(node));
+    } catch (NodeNotFoundException e) {
+      LOG.warn("Could not find Datanode {} while selecting candidate " +
+          "containers for Container Balancer.", node.toString(), e);
+      return containerIDSet;
+    }
+    if (excludeContainers != null) {
+      containerIDSet.removeAll(excludeContainers);
+    }
+    if (selectedContainers != null) {
+      containerIDSet.removeAll(selectedContainers);
+    }
+
+    // remove not closed containers
+    containerIDSet.removeIf(containerID -> {
+      try {
+        return containerManagerV2.getContainer(containerID).getState() !=
+            HddsProtos.LifeCycleState.CLOSED;
+      } catch (ContainerNotFoundException e) {
+        LOG.warn("Could not retrieve ContainerInfo for container {} for " +
+            "checking LifecycleState in ContainerBalancer. Excluding this " +
+            "container.", containerID.toString(), e);
+        return true;
+      }
+    });
+
+    containerIDSet.removeIf(this::isContainerReplicatingOrDeleting);
+    return containerIDSet;
+  }
+
+  /**
+   * Checks if the first container has more used space than second.
+   * @param first first container to compare
+   * @param second second container to compare
+   * @return An integer greater than 0 if first is more used, 0 if they're
+   * the same containers or a container is not found, and a value less than 0
+   * if first is not more used than second.
+   */
+  private int isContainerMoreUsed(ContainerID first,

Review comment:
       I think lower utilised containers would be available earlier in the set.

##########
File path: hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancer.java
##########
@@ -141,18 +200,231 @@ public void unBalancedNodesListShouldBeEmptyWhenClusterIsBalanced() {
    */
   @Test
   public void containerBalancerShouldStopWhenMaxDatanodesToBalanceIsReached() {
-    balancerConfiguration.setMaxDatanodesToBalance(2);
+    balancerConfiguration.setMaxDatanodesToBalance(4);
+    balancerConfiguration.setThreshold(0.01);
+    containerBalancer.start(balancerConfiguration);
+
+    // waiting for balance completed.
+    // TODO: this is a temporary implementation for now
+    // modify this after balancer is fully completed
+    try {
+      Thread.sleep(1000);
+    } catch (InterruptedException e) {}
+
+    Assert.assertFalse(containerBalancer.isBalancerRunning());

Review comment:
       Can we also add an assertion in the test to know that 4 datanodes have been balanced?

##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerConfiguration.java
##########
@@ -136,14 +158,49 @@ public long getMaxSizeToMove() {
   }
 
   /**
-   * Sets the value of maximum size that will be moved by Container Balancer.
+   * Sets the value of maximum size that will be moved by Container Balancer
+   * in one iteration.
    *
    * @param maxSizeToMove maximum number of Bytes
    */
   public void setMaxSizeToMove(long maxSizeToMove) {
     this.maxSizeToMove = maxSizeToMove;
   }
 
+  public long getMaxSizeEnteringTarget() {
+    return maxSizeEnteringTarget;
+  }
+
+  public void setMaxSizeEnteringTarget(long maxSizeEnteringTarget) {
+    this.maxSizeEnteringTarget = maxSizeEnteringTarget;
+  }
+
+  public long getMaxSizeLeavingSource() {
+    return maxSizeLeavingSource;
+  }
+
+  public void setMaxSizeLeavingSource(long maxSizeLeavingSource) {
+    this.maxSizeLeavingSource = maxSizeLeavingSource;
+  }
+
+  public Set<ContainerID> getExcludeContainers() {
+    if (excludeContainers.isEmpty()) {
+      return new HashSet<>();
+    }
+    return Arrays.stream(excludeContainers.split(", |,"))

Review comment:
       We can split it by ',' and then trim the whitespace to fetch long value.

##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
##########
@@ -226,78 +256,188 @@ private boolean initializeIteration() {
     }
     Collections.reverse(underUtilizedNodes);
 
-    long countDatanodesBalanced = 0;
-    // count number of nodes that were balanced in previous iteration
-    for (DatanodeUsageInfo node : unBalancedNodes) {
-      if (!containsNode(overUtilizedNodes, node) &&
-          !containsNode(underUtilizedNodes, node)) {
-        countDatanodesBalanced += 1;
-      }
-    }
-    // calculate total number of nodes that have been balanced so far
-    countDatanodesBalanced =
-        metrics.incrementDatanodesNumBalanced(countDatanodesBalanced);
-
     unBalancedNodes = new ArrayList<>(
         overUtilizedNodes.size() + underUtilizedNodes.size());
+    unBalancedNodes.addAll(overUtilizedNodes);
+    unBalancedNodes.addAll(underUtilizedNodes);
 
-    if (countDatanodesBalanced + countDatanodesToBalance >
-        maxDatanodesToBalance) {
-      LOG.info("Approaching Max Datanodes To Balance limit in Container " +
-          "Balancer. Stopping Balancer.");
+    if (unBalancedNodes.isEmpty()) {
+      LOG.info("Did not find any unbalanced Datanodes.");
       return false;
-    } else {
-      unBalancedNodes.addAll(overUtilizedNodes);
-      unBalancedNodes.addAll(underUtilizedNodes);
-
-      //for now, we just sleep to simulate the execution of balancer
-      //this if for acceptance test now. modify this later when balancer
-      //if fully completed
-      try {
-        Thread.sleep(50);
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-      }
-      /////////////////////////////
+    }
+
+    LOG.info("Container Balancer has identified Datanodes that need to be" +
+        " balanced.");
+
+    selectionCriteria = new ContainerBalancerSelectionCriteria(config,
+        nodeManager, replicationManager, containerManager);
+    sourceToTargetMap = new HashMap<>(overUtilizedNodes.size() +
+        withinThresholdUtilizedNodes.size());
+
+    // initialize maps to track how much size is leaving and entering datanodes
+    sizeLeavingNode = new HashMap<>(overUtilizedNodes.size() +
+        withinThresholdUtilizedNodes.size());
+    overUtilizedNodes.forEach(datanodeUsageInfo -> sizeLeavingNode
+        .put(datanodeUsageInfo.getDatanodeDetails(), 0L));
+    withinThresholdUtilizedNodes.forEach(datanodeUsageInfo -> sizeLeavingNode
+        .put(datanodeUsageInfo.getDatanodeDetails(), 0L));
+
+    sizeEnteringNode = new HashMap<>(underUtilizedNodes.size() +
+        withinThresholdUtilizedNodes.size());
+    underUtilizedNodes.forEach(datanodeUsageInfo -> sizeEnteringNode
+        .put(datanodeUsageInfo.getDatanodeDetails(), 0L));
+    withinThresholdUtilizedNodes.forEach(datanodeUsageInfo -> sizeEnteringNode
+        .put(datanodeUsageInfo.getDatanodeDetails(), 0L));
 
-      if (unBalancedNodes.isEmpty()) {
-        LOG.info("Did not find any unbalanced Datanodes.");
+    return true;
+  }
+
+  private boolean doIteration() {
+    List<DatanodeDetails> potentialTargets = getPotentialTargets();
+    Set<DatanodeDetails> selectedTargets =
+        new HashSet<>(potentialTargets.size());
+
+    // match each overUtilized node with a target
+    for (DatanodeUsageInfo datanodeUsageInfo : overUtilizedNodes) {
+      DatanodeDetails source = datanodeUsageInfo.getDatanodeDetails();

Review comment:
       We will need to support multiple container move from a source container.

##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
##########
@@ -113,49 +130,50 @@ public boolean start(
     lock.lock();
     try {
       if (!balancerRunning.compareAndSet(false, true)) {
-        LOG.info("Container Balancer is already running.");
+        LOG.error("Container Balancer is already running.");
         return false;
       }
-      
+
+      ozoneConfiguration = new OzoneConfiguration();
       this.config = balancerConfiguration;
-      this.idleIteration = config.getIdleIteration();
-      this.threshold = config.getThreshold();
-      this.maxDatanodesToBalance = config.getMaxDatanodesToBalance();
-      this.maxSizeToMoveInGB = config.getMaxSizeToMove();
-      this.unBalancedNodes = new ArrayList<>();
       LOG.info("Starting Container Balancer...{}", this);
+
       //we should start a new balancer thread async
       //and response to cli as soon as possible
 
 
       //TODO: this is a temporary implementation
       //modify this later
-      currentBalancingThread = new Thread(() -> balance());
+      currentBalancingThread = new Thread(this::balance);
       currentBalancingThread.start();
       ////////////////////////
     } finally {
       lock.unlock();
     }
 
-
     return true;
   }
 
   /**
    * Balances the cluster.
    */
   private void balance() {
-    for (int i = 0; i < idleIteration; i++) {
-      if (!initializeIteration()) {
+    countDatanodesBalanced = 0;
+    totalSizeMoved = 0;
+    int i = 0;
+    do {
+      this.idleIteration = config.getIdleIteration();
+      this.threshold = config.getThreshold();
+      this.maxDatanodesToBalance = config.getMaxDatanodesToBalance();
+      this.maxSizeToMove = config.getMaxSizeToMove();
+
+      if (!initializeIteration() || !doIteration()) {

Review comment:
       If `initializeIteration` and `doIteration` are called for every iteration, it is better to have them into separate statements.

##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
##########
@@ -351,6 +491,87 @@ public static double calculateUtilization(
         stat.getCapacity().get().doubleValue();
   }
 
+  boolean canSizeLeaveSource(DatanodeDetails source, long size) {
+    if (sizeLeavingNode.containsKey(source)) {
+      return sizeLeavingNode.get(source) + size <=
+          config.getMaxSizeLeavingSource();
+    }
+    return false;
+  }
+
+  /**
+   * Checks if specified size can enter specified target datanode
+   * according to configuration.
+   * @param target target datanode in which size is entering
+   * @param size size in bytes
+   * @return true if size can enter target, else false
+   */
+  boolean canSizeEnterTarget(DatanodeDetails target,
+                                    long size) {
+    if (sizeEnteringNode.containsKey(target)) {
+      return sizeEnteringNode.get(target) + size <=
+          config.getMaxSizeEnteringTarget();
+    }
+    return false;
+  }
+
+  /**
+   * Get potential targets for container move. Potential targets are under
+   * utilized and within threshold utilized nodes.
+   * @return A list of potential target DatanodeDetails.
+   */
+  private List<DatanodeDetails> getPotentialTargets() {
+    List<DatanodeDetails> potentialTargets = new ArrayList<>(
+        underUtilizedNodes.size() + withinThresholdUtilizedNodes.size());
+
+    underUtilizedNodes
+        .forEach(node -> potentialTargets.add(node.getDatanodeDetails()));
+    withinThresholdUtilizedNodes
+        .forEach(node -> potentialTargets.add(node.getDatanodeDetails()));
+    return potentialTargets;
+  }
+
+  /**
+   * Updates conditions for balancing, such as total size moved by balancer,
+   * total size that has entered a datanode, and total size that has left a
+   * datanode in this iteration.
+   * @param source source datanode
+   * @param moveSelection selected target datanode and container
+   */
+  private void incSizeSelectedForMoving(DatanodeDetails source,
+                                       ContainerMoveSelection moveSelection) {
+    DatanodeDetails target =
+        moveSelection.getTargetNode();
+    ContainerInfo container;
+    try {
+      container =
+          containerManager.getContainer(moveSelection.getContainerID());
+    } catch (ContainerNotFoundException e) {
+      LOG.warn("Could not find Container {} while matching source and " +
+              "target nodes in ContainerBalancer",
+          moveSelection.getContainerID(), e);
+      return;
+    }
+    long size = container.getUsedBytes();
+    totalSizeMoved += size;
+
+    // update sizeLeavingNode map with the recent moveSelection
+    if (sizeLeavingNode.containsKey(source)) {

Review comment:
       contains check should not be required for sizeLeavingNode and sizeEnteringNode since these maps are already initialised.

##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
##########
@@ -226,78 +256,188 @@ private boolean initializeIteration() {
     }
     Collections.reverse(underUtilizedNodes);
 
-    long countDatanodesBalanced = 0;
-    // count number of nodes that were balanced in previous iteration
-    for (DatanodeUsageInfo node : unBalancedNodes) {
-      if (!containsNode(overUtilizedNodes, node) &&
-          !containsNode(underUtilizedNodes, node)) {
-        countDatanodesBalanced += 1;
-      }
-    }
-    // calculate total number of nodes that have been balanced so far
-    countDatanodesBalanced =
-        metrics.incrementDatanodesNumBalanced(countDatanodesBalanced);
-
     unBalancedNodes = new ArrayList<>(
         overUtilizedNodes.size() + underUtilizedNodes.size());
+    unBalancedNodes.addAll(overUtilizedNodes);
+    unBalancedNodes.addAll(underUtilizedNodes);
 
-    if (countDatanodesBalanced + countDatanodesToBalance >
-        maxDatanodesToBalance) {
-      LOG.info("Approaching Max Datanodes To Balance limit in Container " +
-          "Balancer. Stopping Balancer.");
+    if (unBalancedNodes.isEmpty()) {
+      LOG.info("Did not find any unbalanced Datanodes.");
       return false;
-    } else {
-      unBalancedNodes.addAll(overUtilizedNodes);
-      unBalancedNodes.addAll(underUtilizedNodes);
-
-      //for now, we just sleep to simulate the execution of balancer
-      //this if for acceptance test now. modify this later when balancer
-      //if fully completed
-      try {
-        Thread.sleep(50);
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-      }
-      /////////////////////////////
+    }
+
+    LOG.info("Container Balancer has identified Datanodes that need to be" +
+        " balanced.");
+
+    selectionCriteria = new ContainerBalancerSelectionCriteria(config,
+        nodeManager, replicationManager, containerManager);
+    sourceToTargetMap = new HashMap<>(overUtilizedNodes.size() +
+        withinThresholdUtilizedNodes.size());
+
+    // initialize maps to track how much size is leaving and entering datanodes
+    sizeLeavingNode = new HashMap<>(overUtilizedNodes.size() +
+        withinThresholdUtilizedNodes.size());
+    overUtilizedNodes.forEach(datanodeUsageInfo -> sizeLeavingNode
+        .put(datanodeUsageInfo.getDatanodeDetails(), 0L));
+    withinThresholdUtilizedNodes.forEach(datanodeUsageInfo -> sizeLeavingNode
+        .put(datanodeUsageInfo.getDatanodeDetails(), 0L));
+
+    sizeEnteringNode = new HashMap<>(underUtilizedNodes.size() +
+        withinThresholdUtilizedNodes.size());
+    underUtilizedNodes.forEach(datanodeUsageInfo -> sizeEnteringNode
+        .put(datanodeUsageInfo.getDatanodeDetails(), 0L));
+    withinThresholdUtilizedNodes.forEach(datanodeUsageInfo -> sizeEnteringNode
+        .put(datanodeUsageInfo.getDatanodeDetails(), 0L));
 
-      if (unBalancedNodes.isEmpty()) {
-        LOG.info("Did not find any unbalanced Datanodes.");
+    return true;
+  }
+
+  private boolean doIteration() {
+    List<DatanodeDetails> potentialTargets = getPotentialTargets();
+    Set<DatanodeDetails> selectedTargets =
+        new HashSet<>(potentialTargets.size());
+
+    // match each overUtilized node with a target
+    for (DatanodeUsageInfo datanodeUsageInfo : overUtilizedNodes) {
+      DatanodeDetails source = datanodeUsageInfo.getDatanodeDetails();
+      if (!checkConditionsForBalancing()) {
+        LOG.info("Conditions for balancing failed. Stopping Container " +
+            "Balancer...");
         return false;
-      } else {
-        LOG.info("Container Balancer has identified Datanodes that need to be" +
-            " balanced.");
+      }
+
+      ContainerMoveSelection moveSelection =
+          matchSourceWithTarget(source, potentialTargets);
+
+      if (moveSelection != null) {
+        LOG.info("ContainerBalancer is trying to move container {} from " +
+                "source datanode {} to target datanode {}",
+            moveSelection.getContainerID().toString(), source.getUuidString(),
+            moveSelection.getTargetNode().getUuidString());
+
+        // move container
+        // if move successful, do the following
+        potentialTargets = updateTargetsAndSelectionCriteria(potentialTargets,
+            selectedTargets, moveSelection, source);
+      }
+    }
+
+    // if not all underUtilized nodes have been selected, try to match
+    // withinThresholdUtilized nodes with underUtilized nodes
+    if (selectedTargets.size() < underUtilizedNodes.size()) {
+      potentialTargets.removeAll(selectedTargets);
+      Collections.reverse(withinThresholdUtilizedNodes);
+
+      for (DatanodeUsageInfo datanodeUsageInfo : withinThresholdUtilizedNodes) {
+        DatanodeDetails source = datanodeUsageInfo.getDatanodeDetails();
+        if (!checkConditionsForBalancing()) {
+          LOG.info("Conditions for balancing failed. Stopping Container " +
+              "Balancer...");
+          return false;
+        }
+
+        ContainerMoveSelection moveSelection = matchSourceWithTarget(source,
+            potentialTargets);
+
+        if (moveSelection != null) {
+          LOG.info("ContainerBalancer is trying to move container {} from " +
+                  "source datanode {} to target datanode {}",
+              moveSelection.getContainerID().toString(),
+              source.getUuidString(),
+              moveSelection.getTargetNode().getUuidString());
+
+          // move container
+          // if move successful, do the following
+          potentialTargets = updateTargetsAndSelectionCriteria(potentialTargets,
+              selectedTargets, moveSelection, source);
+        }
       }
     }
     return true;
   }
 
   /**
-   * Performs binary search to determine if the specified listToSearch
-   * contains the specified node.
-   *
-   * @param listToSearch List of DatanodeUsageInfo to be searched.
-   * @param node DatanodeUsageInfo to be searched for.
-   * @return true if the specified node is present in listToSearch, otherwise
-   * false.
+   * Match a source datanode with a target datanode and identify the container
+   * to move.
+   * @param potentialTargets List of potential targets to move container to
+   * @return ContainerMoveSelection containing the selected target and container
    */
-  private boolean containsNode(
-      List<DatanodeUsageInfo> listToSearch, DatanodeUsageInfo node) {
-    int index = 0;
-    Comparator<DatanodeUsageInfo> comparator =
-        DatanodeUsageInfo.getMostUsedByRemainingRatio();
-    int size = listToSearch.size();
-    if (size == 0) {
-      return false;
+  private ContainerMoveSelection matchSourceWithTarget(DatanodeDetails source,
+                                    List<DatanodeDetails> potentialTargets) {
+
+    NavigableSet<ContainerID> candidateContainers =
+        selectionCriteria.getCandidateContainers(source);
+
+    if (candidateContainers.isEmpty()) {
+      LOG.info("ContainerBalancer could not find any candidate containers for" +
+          " datanode {}", source.getUuidString());
+      return null;
+    }
+    LOG.info("ContainerBalancer is finding suitable target for source " +
+        "datanode {}", source.getUuidString());
+    ContainerMoveSelection moveSelection =
+        findTargetStrategy.findTargetForContainerMove(
+            source, potentialTargets, candidateContainers,
+            this::canSizeEnterTarget);
+
+    if (moveSelection == null) {
+      LOG.info("ContainerBalancer could not find a suitable target for " +
+          "source node {}.", source.getUuidString());
+      return null;
     }
 
-    if (comparator.compare(listToSearch.get(0),
-        listToSearch.get(size - 1)) < 0) {
-      index =
-          Collections.binarySearch(listToSearch, node, comparator.reversed());
-    } else {
-      index = Collections.binarySearch(listToSearch, node, comparator);
+    LOG.info("ContainerBalancer matched source datanode {} with target " +
+            "datanode {} for container move.", source.getUuidString(),
+        moveSelection.getTargetNode().getUuidString());
+
+    return moveSelection;
+  }
+
+  /**
+   * Checks if limits maxDatanodesToBalance and maxSizeToMove have not been hit.
+   * @return true if conditions pass and balancing can continue, else false
+   */
+  private boolean checkConditionsForBalancing() {
+    if (countDatanodesBalanced + 2 > maxDatanodesToBalance) {
+      LOG.info("Hit max datanodes to balance limit. {} datanodes have already" +
+              " been balanced and the limit is {}.", countDatanodesBalanced,
+          maxDatanodesToBalance);
+      return false;
     }
-    return index >= 0 && listToSearch.get(index).equals(node);
+    if (totalSizeMoved + (long) ozoneConfiguration.getStorageSize(

Review comment:
       We can check container size directly?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org