You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@ozone.apache.org by GitBox <gi...@apache.org> on 2021/08/04 03:12:10 UTC

[GitHub] [ozone] JacksonYao287 commented on a change in pull request #2441: HDDS-4929. Select target datanodes and containers to move for Container Balancer

JacksonYao287 commented on a change in pull request #2441:
URL: https://github.com/apache/ozone/pull/2441#discussion_r682220995



##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
##########
@@ -84,79 +114,90 @@ public ContainerBalancer(
       ContainerManagerV2 containerManager,
       ReplicationManager replicationManager,
       OzoneConfiguration ozoneConfiguration,
-      final SCMContext scmContext) {
+      final SCMContext scmContext,
+      PlacementPolicy placementPolicy) {
     this.nodeManager = nodeManager;
     this.containerManager = containerManager;
     this.replicationManager = replicationManager;
     this.ozoneConfiguration = ozoneConfiguration;
-    this.config = new ContainerBalancerConfiguration();
+    this.config = new ContainerBalancerConfiguration(ozoneConfiguration);
     this.metrics = new ContainerBalancerMetrics();
     this.scmContext = scmContext;
+    this.placementPolicy = placementPolicy;
 
-    this.clusterCapacity = 0L;
-    this.clusterUsed = 0L;
-    this.clusterRemaining = 0L;
-
-    this.overUtilizedNodes = new ArrayList<>();
-    this.underUtilizedNodes = new ArrayList<>();
-    this.unBalancedNodes = new ArrayList<>();
-    this.withinThresholdUtilizedNodes = new ArrayList<>();
     this.lock = new ReentrantLock();
+    findTargetStrategy =
+        new FindTargetGreedy(containerManager, placementPolicy);
   }
+
   /**
    * Starts ContainerBalancer. Current implementation is incomplete.
    *
    * @param balancerConfiguration Configuration values.
    */
-  public boolean start(
-      ContainerBalancerConfiguration balancerConfiguration) {
-    lock.lock();
-    try {
-      if (!balancerRunning.compareAndSet(false, true)) {
-        LOG.info("Container Balancer is already running.");
-        return false;
-      }
-      
-      this.config = balancerConfiguration;
-      this.idleIteration = config.getIdleIteration();
-      this.threshold = config.getThreshold();
-      this.maxDatanodesToBalance = config.getMaxDatanodesToBalance();
-      this.maxSizeToMoveInGB = config.getMaxSizeToMove();
-      this.unBalancedNodes = new ArrayList<>();
-      LOG.info("Starting Container Balancer...{}", this);
-      //we should start a new balancer thread async
-      //and response to cli as soon as possible
+  public boolean start(ContainerBalancerConfiguration balancerConfiguration) {
+    if (!balancerRunning.compareAndSet(false, true)) {

Review comment:
       after a deep thought , i think we may add the lock back. although `compareAndSet` can avoid concurrent `start` and concurrent `stop` , but it could not avoid concurrent `start` and `stop`. so i think the solution here is that, we use a lock to protect `start` and `stop` option , and thus `balancerRunning`  is also protected by this lock  ,so we can make `balancerRunning`  `boolen volitile`  , which is good too for `isBalancerRunning`

##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/FindTargetGreedy.java
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container.balancer;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.ContainerPlacementStatus;
+import org.apache.hadoop.hdds.scm.PlacementPolicy;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerManagerV2;
+import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+
+/**
+ * Find a target giving preference to more under-utilized nodes.
+ */
+public class FindTargetGreedy implements FindTargetStrategy {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(FindTargetGreedy.class);
+
+  private ContainerManagerV2 containerManager;
+  private PlacementPolicy placementPolicy;
+
+  public FindTargetGreedy(
+      ContainerManagerV2 containerManager,
+      PlacementPolicy placementPolicy) {
+    this.containerManager = containerManager;
+    this.placementPolicy = placementPolicy;
+  }
+
+  /**
+   * Find a {@link ContainerMoveSelection} consisting of a target and
+   * container to move for a source datanode. Favours more under-utilized nodes.
+   * @param source Datanode to find a target for
+   * @param potentialTargets Collection of potential target datanodes
+   * @param candidateContainers Set of candidate containers satisfying
+   *                            selection criteria
+   *                            {@link ContainerBalancerSelectionCriteria}
+   * @param canSizeEnterTarget A functional interface whose apply
+   * (DatanodeDetails, Long) method returns true if the size specified in the
+   * second argument can enter the specified DatanodeDetails node
+   * @return Found target and container
+   */
+  @Override
+  public ContainerMoveSelection findTargetForContainerMove(
+      DatanodeDetails source, Collection<DatanodeDetails> potentialTargets,
+      Set<ContainerID> candidateContainers,
+      BiFunction<DatanodeDetails, Long, Boolean> canSizeEnterTarget) {
+    for (DatanodeDetails target : potentialTargets) {
+      for (ContainerID container : candidateContainers) {
+        Set<ContainerReplica> replicas;
+        ContainerInfo containerInfo;
+
+        try {
+          replicas = containerManager.getContainerReplicas(container);
+          containerInfo = containerManager.getContainer(container);
+        } catch (ContainerNotFoundException e) {
+          LOG.warn("Could not get Container {} from Container Manager for " +
+              "obtaining replicas in Container Balancer.", container, e);
+          continue;
+        }
+
+        if (replicas.stream().noneMatch(
+            replica -> replica.getDatanodeDetails().equals(target)) &&
+            containerMoveSatisfiesPlacementPolicy(container, replicas, source,
+            target) &&
+            canSizeEnterTarget.apply(target, containerInfo.getUsedBytes())) {

Review comment:
       maybe later we can take networkTopology in to account,  for all the candidate targets , we always choose the nearest one in networkTopology. this can be done in a new jira, maybe i can help doing this

##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
##########
@@ -84,79 +114,90 @@ public ContainerBalancer(
       ContainerManagerV2 containerManager,
       ReplicationManager replicationManager,
       OzoneConfiguration ozoneConfiguration,
-      final SCMContext scmContext) {
+      final SCMContext scmContext,
+      PlacementPolicy placementPolicy) {
     this.nodeManager = nodeManager;
     this.containerManager = containerManager;
     this.replicationManager = replicationManager;
     this.ozoneConfiguration = ozoneConfiguration;
-    this.config = new ContainerBalancerConfiguration();
+    this.config = new ContainerBalancerConfiguration(ozoneConfiguration);
     this.metrics = new ContainerBalancerMetrics();
     this.scmContext = scmContext;
+    this.placementPolicy = placementPolicy;
 
-    this.clusterCapacity = 0L;
-    this.clusterUsed = 0L;
-    this.clusterRemaining = 0L;
-
-    this.overUtilizedNodes = new ArrayList<>();
-    this.underUtilizedNodes = new ArrayList<>();
-    this.unBalancedNodes = new ArrayList<>();
-    this.withinThresholdUtilizedNodes = new ArrayList<>();
     this.lock = new ReentrantLock();
+    findTargetStrategy =
+        new FindTargetGreedy(containerManager, placementPolicy);
   }
+
   /**
    * Starts ContainerBalancer. Current implementation is incomplete.
    *
    * @param balancerConfiguration Configuration values.
    */
-  public boolean start(
-      ContainerBalancerConfiguration balancerConfiguration) {
-    lock.lock();
-    try {
-      if (!balancerRunning.compareAndSet(false, true)) {
-        LOG.info("Container Balancer is already running.");
-        return false;
-      }
-      
-      this.config = balancerConfiguration;
-      this.idleIteration = config.getIdleIteration();
-      this.threshold = config.getThreshold();
-      this.maxDatanodesToBalance = config.getMaxDatanodesToBalance();
-      this.maxSizeToMoveInGB = config.getMaxSizeToMove();
-      this.unBalancedNodes = new ArrayList<>();
-      LOG.info("Starting Container Balancer...{}", this);
-      //we should start a new balancer thread async
-      //and response to cli as soon as possible
+  public boolean start(ContainerBalancerConfiguration balancerConfiguration) {
+    if (!balancerRunning.compareAndSet(false, true)) {
+      LOG.error("Container Balancer is already running.");
+      return false;
+    }
 
+    ozoneConfiguration = new OzoneConfiguration();
+    this.config = balancerConfiguration;
+    LOG.info("Starting Container Balancer...{}", this);
 
-      //TODO: this is a temporary implementation
-      //modify this later
-      currentBalancingThread = new Thread(() -> balance());
-      currentBalancingThread.start();
-      ////////////////////////
-    } finally {
-      lock.unlock();
-    }
+    //we should start a new balancer thread async
+    //and response to cli as soon as possible
 
 
+    //TODO: this is a temporary implementation
+    //modify this later
+    currentBalancingThread = new Thread(this::balance);
+    currentBalancingThread.start();
+    ////////////////////////
     return true;
   }
 
   /**
    * Balances the cluster.
    */
   private void balance() {
+    this.idleIteration = config.getIdleIteration();
     for (int i = 0; i < idleIteration; i++) {
+      countDatanodesInvolvedPerIteration = 0;
+      sizeMovedPerIteration = 0;
+      this.threshold = config.getThreshold();
+      this.maxDatanodesToInvolvePerIteration =
+          config.getMaxDatanodesToInvolvePerIteration();
+      this.maxSizeToMovePerIteration = config.getMaxSizeToMovePerIteration();

Review comment:
       as you mentioned, i am not sure why these parameters can be changed while balancer is running. i think they are set once in each balance thread. please correct me if i am wrong

##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
##########
@@ -84,79 +114,90 @@ public ContainerBalancer(
       ContainerManagerV2 containerManager,
       ReplicationManager replicationManager,
       OzoneConfiguration ozoneConfiguration,
-      final SCMContext scmContext) {
+      final SCMContext scmContext,
+      PlacementPolicy placementPolicy) {
     this.nodeManager = nodeManager;
     this.containerManager = containerManager;
     this.replicationManager = replicationManager;
     this.ozoneConfiguration = ozoneConfiguration;
-    this.config = new ContainerBalancerConfiguration();
+    this.config = new ContainerBalancerConfiguration(ozoneConfiguration);
     this.metrics = new ContainerBalancerMetrics();
     this.scmContext = scmContext;
+    this.placementPolicy = placementPolicy;
 
-    this.clusterCapacity = 0L;
-    this.clusterUsed = 0L;
-    this.clusterRemaining = 0L;
-
-    this.overUtilizedNodes = new ArrayList<>();
-    this.underUtilizedNodes = new ArrayList<>();
-    this.unBalancedNodes = new ArrayList<>();
-    this.withinThresholdUtilizedNodes = new ArrayList<>();
     this.lock = new ReentrantLock();
+    findTargetStrategy =
+        new FindTargetGreedy(containerManager, placementPolicy);
   }
+
   /**
    * Starts ContainerBalancer. Current implementation is incomplete.
    *
    * @param balancerConfiguration Configuration values.
    */
-  public boolean start(
-      ContainerBalancerConfiguration balancerConfiguration) {
-    lock.lock();
-    try {
-      if (!balancerRunning.compareAndSet(false, true)) {
-        LOG.info("Container Balancer is already running.");
-        return false;
-      }
-      
-      this.config = balancerConfiguration;
-      this.idleIteration = config.getIdleIteration();
-      this.threshold = config.getThreshold();
-      this.maxDatanodesToBalance = config.getMaxDatanodesToBalance();
-      this.maxSizeToMoveInGB = config.getMaxSizeToMove();
-      this.unBalancedNodes = new ArrayList<>();
-      LOG.info("Starting Container Balancer...{}", this);
-      //we should start a new balancer thread async
-      //and response to cli as soon as possible
+  public boolean start(ContainerBalancerConfiguration balancerConfiguration) {
+    if (!balancerRunning.compareAndSet(false, true)) {
+      LOG.error("Container Balancer is already running.");
+      return false;
+    }
 
+    ozoneConfiguration = new OzoneConfiguration();
+    this.config = balancerConfiguration;
+    LOG.info("Starting Container Balancer...{}", this);
 
-      //TODO: this is a temporary implementation
-      //modify this later
-      currentBalancingThread = new Thread(() -> balance());
-      currentBalancingThread.start();
-      ////////////////////////
-    } finally {
-      lock.unlock();
-    }
+    //we should start a new balancer thread async
+    //and response to cli as soon as possible
 
 
+    //TODO: this is a temporary implementation
+    //modify this later
+    currentBalancingThread = new Thread(this::balance);
+    currentBalancingThread.start();
+    ////////////////////////
     return true;
   }
 
   /**
    * Balances the cluster.
    */
   private void balance() {
+    this.idleIteration = config.getIdleIteration();
     for (int i = 0; i < idleIteration; i++) {
+      countDatanodesInvolvedPerIteration = 0;
+      sizeMovedPerIteration = 0;
+      this.threshold = config.getThreshold();
+      this.maxDatanodesToInvolvePerIteration =
+          config.getMaxDatanodesToInvolvePerIteration();
+      this.maxSizeToMovePerIteration = config.getMaxSizeToMovePerIteration();
+
+      // stop balancing if iteration is not initialized
       if (!initializeIteration()) {
-        //balancer should be stopped immediately
-        break;
+        stop();
+        return;
+      }
+      doIteration();
+      if (!isBalancerRunning()) {
+        return;
+      }
+
+      // wait for configured time before starting next iteration, unless
+      // this was the final iteration
+      if (i != idleIteration - 1) {
+        synchronized (this) {

Review comment:
       why we need `synchronized` here?

##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
##########
@@ -180,6 +198,16 @@ private boolean initializeIteration() {
       return false;
     }
 
+    this.clusterCapacity = 0L;
+    this.clusterUsed = 0L;
+    this.clusterRemaining = 0L;
+
+    this.selectedContainers = new HashSet<>();
+    this.overUtilizedNodes = new ArrayList<>();
+    this.underUtilizedNodes = new ArrayList<>();
+    this.withinThresholdUtilizedNodes = new ArrayList<>();
+    this.unBalancedNodes = new ArrayList<>();

Review comment:
       >Also since those lists need to be initialized at least once, we need to create them here. Instead, if this initialization is moved to the constructor, we will need to construct every time we want to restart balancer.
   
   maybe we can construct these list at the constructor function of balancer, and then reuse them with `clear()`

##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
##########
@@ -63,19 +84,28 @@
   private ContainerBalancerMetrics metrics;
   private long clusterCapacity;
   private long clusterUsed;
-  private long clusterRemaining;
   private double clusterAvgUtilisation;
   private final AtomicBoolean balancerRunning = new AtomicBoolean(false);
   private Thread currentBalancingThread;
   private Lock lock;
+  private ContainerBalancerSelectionCriteria selectionCriteria;
+  private Map<DatanodeDetails, ContainerMoveSelection> sourceToTargetMap;
+  private Map<DatanodeDetails, Long> sizeLeavingNode;
+  private Map<DatanodeDetails, Long> sizeEnteringNode;
+  private Set<ContainerID> selectedContainers;
+  private FindTargetStrategy findTargetStrategy;
+  private PlacementPolicy placementPolicy;

Review comment:
       `placementPolicy` seems not used

##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
##########
@@ -224,86 +291,284 @@ private boolean initializeIteration() {
         withinThresholdUtilizedNodes.add(datanodeUsageInfo);
       }
     }
+    metrics.setDatanodesNumToBalance(new LongMetric(countDatanodesToBalance));
+    // TODO update dataSizeToBalanceGB metric with overLoadedBytes and
+    //  underLoadedBytes
     Collections.reverse(underUtilizedNodes);
 
-    long countDatanodesBalanced = 0;
-    // count number of nodes that were balanced in previous iteration
-    for (DatanodeUsageInfo node : unBalancedNodes) {
-      if (!containsNode(overUtilizedNodes, node) &&
-          !containsNode(underUtilizedNodes, node)) {
-        countDatanodesBalanced += 1;
-      }
-    }
-    // calculate total number of nodes that have been balanced so far
-    countDatanodesBalanced =
-        metrics.incrementDatanodesNumBalanced(countDatanodesBalanced);
-
     unBalancedNodes = new ArrayList<>(
         overUtilizedNodes.size() + underUtilizedNodes.size());
+    unBalancedNodes.addAll(overUtilizedNodes);
+    unBalancedNodes.addAll(underUtilizedNodes);
 
-    if (countDatanodesBalanced + countDatanodesToBalance >
-        maxDatanodesToBalance) {
-      LOG.info("Approaching Max Datanodes To Balance limit in Container " +
-          "Balancer. Stopping Balancer.");
+    if (unBalancedNodes.isEmpty()) {
+      LOG.info("Did not find any unbalanced Datanodes.");
       return false;
-    } else {
-      unBalancedNodes.addAll(overUtilizedNodes);
-      unBalancedNodes.addAll(underUtilizedNodes);
+    }
+
+    LOG.info("Container Balancer has identified {} Over-Utilized and {} " +
+            "Under-Utilized Datanodes that need to be balanced.",
+        overUtilizedNodes.size(), underUtilizedNodes.size());
+
+    selectionCriteria = new ContainerBalancerSelectionCriteria(config,
+        nodeManager, replicationManager, containerManager);
+    sourceToTargetMap = new HashMap<>(overUtilizedNodes.size() +
+        withinThresholdUtilizedNodes.size());
+
+    // initialize maps to track how much size is leaving and entering datanodes
+    sizeLeavingNode = new HashMap<>(overUtilizedNodes.size() +
+        withinThresholdUtilizedNodes.size());
+    overUtilizedNodes.forEach(datanodeUsageInfo -> sizeLeavingNode
+        .put(datanodeUsageInfo.getDatanodeDetails(), 0L));
+    withinThresholdUtilizedNodes.forEach(datanodeUsageInfo -> sizeLeavingNode
+        .put(datanodeUsageInfo.getDatanodeDetails(), 0L));
+
+    sizeEnteringNode = new HashMap<>(underUtilizedNodes.size() +
+        withinThresholdUtilizedNodes.size());
+    underUtilizedNodes.forEach(datanodeUsageInfo -> sizeEnteringNode
+        .put(datanodeUsageInfo.getDatanodeDetails(), 0L));
+    withinThresholdUtilizedNodes.forEach(datanodeUsageInfo -> sizeEnteringNode
+        .put(datanodeUsageInfo.getDatanodeDetails(), 0L));
 
-      //for now, we just sleep to simulate the execution of balancer
-      //this if for acceptance test now. modify this later when balancer
-      //if fully completed
+    return true;
+  }
+
+  private IterationResult doIteration() {
+    List<DatanodeDetails> potentialTargets = getPotentialTargets();
+    Set<DatanodeDetails> selectedTargets =
+        new HashSet<>(potentialTargets.size());
+    moveSelectionToFutureMap = new HashMap<>(unBalancedNodes.size());
+
+    // match each overUtilized node with a target
+    for (DatanodeUsageInfo datanodeUsageInfo : overUtilizedNodes) {
+      DatanodeDetails source = datanodeUsageInfo.getDatanodeDetails();
+      IterationResult result = checkConditionsForBalancing();
+      if (result != null) {
+        LOG.info("Conditions for balancing failed. Exiting current " +
+            "iteration...");
+        return result;
+      }
+
+      ContainerMoveSelection moveSelection =
+          matchSourceWithTarget(source, potentialTargets);
+
+      if (moveSelection != null) {
+        LOG.info("ContainerBalancer is trying to move container {} from " +
+                "source datanode {} to target datanode {}",
+            moveSelection.getContainerID().toString(), source.getUuidString(),
+            moveSelection.getTargetNode().getUuidString());
+
+        if (moveContainer(source, moveSelection)) {
+          // consider move successful for now, and update selection criteria
+          potentialTargets = updateTargetsAndSelectionCriteria(potentialTargets,
+              selectedTargets, moveSelection, source);
+        }
+      }
+    }
+
+    // if not all underUtilized nodes have been selected, try to match
+    // withinThresholdUtilized nodes with underUtilized nodes
+    if (selectedTargets.size() < underUtilizedNodes.size()) {
+      potentialTargets.removeAll(selectedTargets);
+      Collections.reverse(withinThresholdUtilizedNodes);
+
+      for (DatanodeUsageInfo datanodeUsageInfo : withinThresholdUtilizedNodes) {
+        DatanodeDetails source = datanodeUsageInfo.getDatanodeDetails();
+        IterationResult result = checkConditionsForBalancing();
+        if (result != null) {
+          LOG.info("Conditions for balancing failed. Exiting current " +
+              "iteration...");
+          return result;
+        }
+
+        ContainerMoveSelection moveSelection =
+            matchSourceWithTarget(source, potentialTargets);
+
+        if (moveSelection != null) {
+          LOG.info("ContainerBalancer is trying to move container {} from " +
+                  "source datanode {} to target datanode {}",
+              moveSelection.getContainerID().toString(),
+              source.getUuidString(),
+              moveSelection.getTargetNode().getUuidString());
+
+          if (moveContainer(source, moveSelection)) {
+            // consider move successful for now, and update selection criteria
+            potentialTargets =
+                updateTargetsAndSelectionCriteria(potentialTargets,
+                    selectedTargets, moveSelection, source);
+          }
+        }
+      }
+    }
+
+    // check move results
+    for (Map.Entry<ContainerMoveSelection,
+            CompletableFuture<ReplicationManager.MoveResult>>
+        futureEntry : moveSelectionToFutureMap.entrySet()) {
+      ContainerMoveSelection moveSelection = futureEntry.getKey();
+      CompletableFuture<ReplicationManager.MoveResult> future =
+          futureEntry.getValue();
       try {
-        Thread.sleep(50);
+        ReplicationManager.MoveResult result = future.get(
+            config.getMoveTimeout().toMillis(), TimeUnit.MILLISECONDS);
+        if (result == ReplicationManager.MoveResult.COMPLETED) {
+          metrics.incrementMovedContainersNum(1);
+          //TODO update metrics with size moved in this iteration
+        }
       } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
+        LOG.warn("Container move for container {} was interrupted.",
+            moveSelection.getContainerID(), e);
+      } catch (ExecutionException e) {
+        LOG.warn("Container move for container {} completed exceptionally.",
+            moveSelection.getContainerID(), e);
+      } catch (TimeoutException e) {
+        LOG.warn("Container move for container {} timed out.",
+            moveSelection.getContainerID(), e);
       }
-      /////////////////////////////
+    }
+    return IterationResult.ITERATION_COMPLETED;
+  }
 
-      if (unBalancedNodes.isEmpty()) {
-        LOG.info("Did not find any unbalanced Datanodes.");
-        return false;
-      } else {
-        LOG.info("Container Balancer has identified Datanodes that need to be" +
-            " balanced.");
-      }
+  /**
+   * Match a source datanode with a target datanode and identify the container
+   * to move.
+   *
+   * @param potentialTargets Collection of potential targets to move
+   *                         container to
+   * @return ContainerMoveSelection containing the selected target and container
+   */
+  private ContainerMoveSelection matchSourceWithTarget(
+      DatanodeDetails source, Collection<DatanodeDetails> potentialTargets) {
+    NavigableSet<ContainerID> candidateContainers =
+        selectionCriteria.getCandidateContainers(source);
+
+    if (candidateContainers.isEmpty()) {
+      LOG.info("ContainerBalancer could not find any candidate containers for" +
+          " datanode {}", source.getUuidString());
+      return null;
     }
-    return true;
+    LOG.info("ContainerBalancer is finding suitable target for source " +
+        "datanode {}", source.getUuidString());
+    ContainerMoveSelection moveSelection =
+        findTargetStrategy.findTargetForContainerMove(
+            source, potentialTargets, candidateContainers,
+            this::canSizeEnterTarget);
+
+    if (moveSelection == null) {
+      LOG.info("ContainerBalancer could not find a suitable target for " +
+          "source node {}.", source.getUuidString());
+      return null;
+    }
+    LOG.info("ContainerBalancer matched source datanode {} with target " +
+            "datanode {} for container move.", source.getUuidString(),
+        moveSelection.getTargetNode().getUuidString());
+
+    return moveSelection;
   }
 
   /**
-   * Performs binary search to determine if the specified listToSearch
-   * contains the specified node.
+   * Checks if limits maxDatanodesToInvolvePerIteration and
+   * maxSizeToMovePerIteration have not been hit.
    *
-   * @param listToSearch List of DatanodeUsageInfo to be searched.
-   * @param node DatanodeUsageInfo to be searched for.
-   * @return true if the specified node is present in listToSearch, otherwise
-   * false.
+   * @return {@link IterationResult#MAX_DATANODES_TO_INVOLVE_REACHED} if reached
+   * max datanodes to involve limit,
+   * {@link IterationResult#MAX_SIZE_TO_MOVE_REACHED} if reached max size to
+   * move limit, or null if balancing can continue
    */
-  private boolean containsNode(
-      List<DatanodeUsageInfo> listToSearch, DatanodeUsageInfo node) {
-    int index = 0;
-    Comparator<DatanodeUsageInfo> comparator =
-        DatanodeUsageInfo.getMostUsedByRemainingRatio();
-    int size = listToSearch.size();
-    if (size == 0) {
-      return false;
+  private IterationResult checkConditionsForBalancing() {
+    if (countDatanodesInvolvedPerIteration + 2 >
+        maxDatanodesToInvolvePerIteration * totalNodesInCluster) {

Review comment:
       better to rename `maxDatanodesToInvolvePerIteration`  to `maxDatanodesRatioToInvolvePerIteration`, it is a little confused




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org