You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by si...@apache.org on 2022/06/01 12:19:45 UTC

[ozone] branch master updated: HDDS-6280. Support Container Balancer HA (#3423)

This is an automated email from the ASF dual-hosted git repository.

siddhant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new e8abd0f6e2 HDDS-6280. Support Container Balancer HA (#3423)
e8abd0f6e2 is described below

commit e8abd0f6e21dedf25672aaa6a2d638b8442680f0
Author: Siddhant Sangwan <si...@gmail.com>
AuthorDate: Wed Jun 1 17:49:40 2022 +0530

    HDDS-6280. Support Container Balancer HA (#3423)
---
 .../interface-client/src/main/proto/hdds.proto     |  19 ++
 .../scm/container/balancer/ContainerBalancer.java  | 291 ++++++++++++++++-----
 .../balancer/ContainerBalancerConfiguration.java   |  84 ++++++
 .../IllegalContainerBalancerStateException.java    |   5 +-
 ...lidContainerBalancerConfigurationException.java |  11 +-
 .../org/apache/hadoop/hdds/scm/ha/SCMService.java  |   2 +-
 .../hadoop/hdds/scm/ha/SCMServiceException.java    |  44 ++++
 .../hadoop/hdds/scm/ha/SCMServiceManager.java      |   6 +-
 .../apache/hadoop/hdds/scm/ha/StatefulService.java |  23 +-
 .../scm/ha/StatefulServiceStateManagerImpl.java    |  14 +
 .../io/ByteStringCodec.java}                       |  33 +--
 .../apache/hadoop/hdds/scm/ha/io/CodecFactory.java |   2 +
 .../hdds/scm/server/SCMClientProtocolServer.java   |  16 +-
 .../hdds/scm/server/StorageContainerManager.java   |  11 +-
 .../container/balancer/TestContainerBalancer.java  | 157 +++++++----
 .../hadoop/ozone/scm/TestFailoverWithSCMHA.java    |  93 +++++++
 16 files changed, 660 insertions(+), 151 deletions(-)

diff --git a/hadoop-hdds/interface-client/src/main/proto/hdds.proto b/hadoop-hdds/interface-client/src/main/proto/hdds.proto
index c99274da75..01597a7b0e 100644
--- a/hadoop-hdds/interface-client/src/main/proto/hdds.proto
+++ b/hadoop-hdds/interface-client/src/main/proto/hdds.proto
@@ -431,3 +431,22 @@ message ReplicationManagerReportProto {
     repeated KeyIntValue stat = 2;
     repeated KeyContainerIDList statSample = 3;
 }
+
+message ContainerBalancerConfigurationProto {
+    optional string utilizationThreshold = 5;
+    optional int32 datanodesInvolvedMaxPercentagePerIteration = 6;
+    optional int64 sizeMovedMaxPerIteration = 7;
+    optional int64 sizeEnteringTargetMax = 8;
+    optional int64 sizeLeavingSourceMax = 9;
+    optional int32 iterations = 10;
+    optional string excludeContainers = 11;
+    optional int64 moveTimeout = 12;
+    optional int64 balancingIterationInterval = 13;
+    optional string includeDatanodes = 14;
+    optional string excludeDatanodes = 15;
+    optional bool moveNetworkTopologyEnable = 16;
+    optional bool triggerDuBeforeMoveEnable = 17;
+
+    required bool shouldRun = 18;
+    optional int32 nextIterationIndex = 19;
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
index c2d96ffbd2..77d9b5cc61 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.conf.StorageUnit;
 import org.apache.hadoop.hdds.fs.DUFactory;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ContainerBalancerConfigurationProto;
 import org.apache.hadoop.hdds.scm.PlacementPolicy;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
@@ -33,7 +34,7 @@ import org.apache.hadoop.hdds.scm.container.replication.LegacyReplicationManager
 import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
 import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
 import org.apache.hadoop.hdds.scm.ha.SCMContext;
-import org.apache.hadoop.hdds.scm.ha.SCMService;
+import org.apache.hadoop.hdds.scm.ha.StatefulService;
 import org.apache.hadoop.hdds.scm.net.NetworkTopology;
 import org.apache.hadoop.hdds.scm.node.DatanodeUsageInfo;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
@@ -43,6 +44,7 @@ import org.apache.hadoop.ozone.OzoneConsts;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -65,7 +67,7 @@ import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_NODE_REPORT_INTERVAL_DE
  * Container balancer is a service in SCM to move containers between over- and
  * under-utilized datanodes.
  */
-public class ContainerBalancer implements SCMService {
+public class ContainerBalancer extends StatefulService {
 
   public static final Logger LOG =
       LoggerFactory.getLogger(ContainerBalancer.class);
@@ -98,7 +100,6 @@ public class ContainerBalancer implements SCMService {
   private NetworkTopology networkTopology;
   private double upperLimit;
   private double lowerLimit;
-  private volatile boolean balancerRunning;
   private volatile Thread currentBalancingThread;
   private Lock lock;
   private ContainerBalancerSelectionCriteria selectionCriteria;
@@ -110,15 +111,17 @@ public class ContainerBalancer implements SCMService {
       CompletableFuture<LegacyReplicationManager.MoveResult>>
       moveSelectionToFutureMap;
   private IterationResult iterationResult;
+  private int nextIterationIndex;
 
   /**
    * Constructs ContainerBalancer with the specified arguments. Initializes
-   * new ContainerBalancerConfiguration and ContainerBalancerMetrics.
-   * Container Balancer does not start on construction.
+   * ContainerBalancerMetrics. Container Balancer does not start on
+   * construction.
    *
    * @param scm the storage container manager
    */
   public ContainerBalancer(StorageContainerManager scm) {
+    super(scm.getStatefulServiceStateManager());
     this.nodeManager = scm.getScmNodeManager();
     this.containerManager = scm.getContainerManager();
     this.replicationManager = scm.getReplicationManager();
@@ -134,13 +137,16 @@ public class ContainerBalancer implements SCMService {
     this.unBalancedNodes = new ArrayList<>();
     this.placementPolicy = scm.getContainerPlacementPolicy();
     this.networkTopology = scm.getClusterMap();
+    this.nextIterationIndex = 0;
 
     this.lock = new ReentrantLock();
     findSourceStrategy = new FindSourceGreedy(nodeManager);
+    scm.getSCMServiceManager().register(this);
   }
 
   /**
-   * Balances the cluster.
+   * Balances the cluster in iterations. Regularly checks if balancing has
+   * been stopped.
    */
   private void balance() {
     this.iterations = config.getIterations();
@@ -149,7 +155,11 @@ public class ContainerBalancer implements SCMService {
       this.iterations = Integer.MAX_VALUE;
     }
 
-    for (int i = 0; i < iterations && balancerRunning; i++) {
+    // nextIterationIndex is the iteration that balancer should start from on
+    // leader change or restart
+    int i = nextIterationIndex;
+    resetState();
+    for (; i < iterations && isBalancerRunning(); i++) {
       if (config.getTriggerDuEnable()) {
         // before starting a new iteration, we trigger all the datanode
         // to run `du`. this is an aggressive action, with which we can
@@ -179,19 +189,37 @@ public class ContainerBalancer implements SCMService {
         }
       }
 
-      // stop balancing if iteration is not initialized
+      // initialize this iteration. stop balancing on initialization failure
       if (!initializeIteration()) {
-        stopBalancer();
+        // just return if the reason for initialization failure is that
+        // balancer has been stopped in another thread
+        if (!isBalancerRunning()) {
+          return;
+        }
+        // otherwise, try to stop balancer
+        tryStopBalancer("Could not initialize ContainerBalancer's " +
+            "iteration number " + i);
         return;
       }
 
-      //if no new move option is generated, it means the cluster can
-      //not be balanced any more , so just stop
       IterationResult iR = doIteration();
       metrics.incrementNumIterations(1);
       LOG.info("Result of this iteration of Container Balancer: {}", iR);
+
+      // persist next iteration index
+      if (iR == IterationResult.ITERATION_COMPLETED) {
+        try {
+          saveConfiguration(config, true, i + 1);
+        } catch (IOException e) {
+          LOG.warn("Could not persist next iteration index value for " +
+              "ContainerBalancer after completing an iteration", e);
+        }
+      }
+
+      // if no new move option is generated, it means the cluster cannot be
+      // balanced anymore; so just stop balancer
       if (iR == IterationResult.CAN_NOT_BALANCE_ANY_MORE) {
-        stopBalancer();
+        tryStopBalancer(iR.toString());
         return;
       }
 
@@ -215,7 +243,11 @@ public class ContainerBalancer implements SCMService {
         }
       }
     }
-    stopBalancer();
+
+    // finally, stop balancer if it hasn't been stopped already
+    if (isBalancerRunning()) {
+      tryStopBalancer("Completed all iterations.");
+    }
   }
 
   /**
@@ -238,12 +270,11 @@ public class ContainerBalancer implements SCMService {
     List<DatanodeUsageInfo> datanodeUsageInfos =
         nodeManager.getMostOrLeastUsedDatanodes(true);
     if (datanodeUsageInfos.isEmpty()) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Container Balancer could not retrieve nodes from Node " +
-            "Manager.");
-      }
+      LOG.warn("Received an empty list of datanodes from Node Manager when " +
+          "trying to identify which nodes to balance");
       return false;
     }
+
     this.threshold = config.getThresholdAsRatio();
     this.maxDatanodesRatioToInvolvePerIteration =
         config.getMaxDatanodesRatioToInvolvePerIteration();
@@ -801,24 +832,72 @@ public class ContainerBalancer implements SCMService {
   /**
    * Receives a notification for raft or safe mode related status changes.
    * Stops ContainerBalancer if it's running and the current SCM becomes a
-   * follower or enters safe mode.
+   * follower or enters safe mode. Starts ContainerBalancer if the current
+   * SCM becomes leader, is out of safe mode and balancer should run.
    */
   @Override
   public void notifyStatusChanged() {
-    if (!scmContext.isLeader() || scmContext.isInSafeMode()) {
-      if (isBalancerRunning()) {
-        stopBalancingThread();
+    lock.lock();
+    try {
+      if (!scmContext.isLeader() || scmContext.isInSafeMode()) {
+        if (isBalancerRunning()) {
+          LOG.info("Stopping ContainerBalancer in this scm on status change");
+          stop();
+        }
+      } else {
+        if (shouldRun()) {
+          try {
+            LOG.info("Starting ContainerBalancer in this scm on status change");
+            start();
+          } catch (IllegalContainerBalancerStateException |
+              InvalidContainerBalancerConfigurationException e) {
+            LOG.warn("Could not start ContainerBalancer on raft/safe-mode " +
+                "status change.", e);
+          }
+        }
       }
+    } finally {
+      lock.unlock();
     }
   }
 
   /**
-   * Checks if ContainerBalancer should run.
-   * @return false
+   * Checks if ContainerBalancer should start (after a leader change, restart
+   * etc.) by reading persisted state.
+   * @return true if the persisted state is true, otherwise false
    */
   @Override
   public boolean shouldRun() {
-    return false;
+    try {
+      ContainerBalancerConfigurationProto proto =
+          readConfiguration(ContainerBalancerConfigurationProto.class);
+      if (proto == null) {
+        LOG.warn("Could not find persisted configuration for {} when checking" +
+            " if ContainerBalancer should run. ContainerBalancer should not " +
+            "run now.", this.getServiceName());
+        return false;
+      }
+      return proto.getShouldRun();
+    } catch (IOException e) {
+      LOG.warn("Could not read persisted configuration for checking if " +
+          "ContainerBalancer should start. ContainerBalancer should not start" +
+          " now.", e);
+      return false;
+    }
+  }
+
+  /**
+   * Checks if ContainerBalancer is currently running in this SCM.
+   *
+   * @return true if the currentBalancingThread is not null, otherwise false
+   */
+  public boolean isBalancerRunning() {
+    lock.lock();
+    try {
+      return currentBalancingThread != null;
+    } finally {
+      lock.unlock();
+    }
   }
 
   /**
@@ -830,12 +909,47 @@ public class ContainerBalancer implements SCMService {
   }
 
   /**
-   * Starts ContainerBalancer as an SCMService.
+   * Starts ContainerBalancer as an SCMService. Validates state, reads and
+   * validates persisted configuration, and then starts the balancing
+   * thread.
+   * @throws IllegalContainerBalancerStateException if balancer should not
+   * run according to persisted configuration
+   * @throws InvalidContainerBalancerConfigurationException if failed to
+   * retrieve persisted configuration, or the configuration is null
    */
   @Override
-  public void start() {
-    if (shouldRun()) {
+  public void start() throws IllegalContainerBalancerStateException,
+      InvalidContainerBalancerConfigurationException {
+    lock.lock();
+    try {
+      // should be leader-ready, out of safe mode, and not running already
+      validateState(false);
+      ContainerBalancerConfigurationProto proto;
+      try {
+        proto = readConfiguration(ContainerBalancerConfigurationProto.class);
+      } catch (IOException e) {
+        throw new InvalidContainerBalancerConfigurationException("Could not " +
+            "retrieve persisted configuration while starting " +
+            "Container Balancer as an SCMService. Will not start now.", e);
+      }
+      if (proto == null) {
+        throw new InvalidContainerBalancerConfigurationException("Persisted " +
+            "configuration for ContainerBalancer is null during start. Will " +
+            "not start now.");
+      }
+      if (!proto.getShouldRun()) {
+        throw new IllegalContainerBalancerStateException("According to " +
+            "persisted configuration, ContainerBalancer should not run.");
+      }
+      ContainerBalancerConfiguration configuration =
+          ContainerBalancerConfiguration.fromProtobuf(proto,
+              ozoneConfiguration);
+      validateConfiguration(configuration);
+      this.config = configuration;
+      this.nextIterationIndex = proto.getNextIterationIndex();
       startBalancingThread();
+    } finally {
+      lock.unlock();
     }
   }
 
@@ -848,13 +962,16 @@ public class ContainerBalancer implements SCMService {
    * @throws InvalidContainerBalancerConfigurationException if
    * {@link ContainerBalancerConfiguration} config file is incorrectly
    * configured
+   * @throws IOException on failure to persist
+   * {@link ContainerBalancerConfiguration}
    */
-  public void startBalancer() throws IllegalContainerBalancerStateException,
-      InvalidContainerBalancerConfigurationException {
+  public void startBalancer(ContainerBalancerConfiguration configuration)
+      throws IllegalContainerBalancerStateException,
+      InvalidContainerBalancerConfigurationException, IOException {
     lock.lock();
     try {
-      validateState();
-      validateConfiguration(this.config);
+      // validates state, config, and then saves config
+      setBalancerConfigOnStartBalancer(configuration);
       startBalancingThread();
     } finally {
       lock.unlock();
@@ -867,7 +984,6 @@ public class ContainerBalancer implements SCMService {
   private void startBalancingThread() {
     lock.lock();
     try {
-      balancerRunning = true;
       currentBalancingThread = new Thread(this::balance);
       currentBalancingThread.setName("ContainerBalancer");
       currentBalancingThread.setDaemon(true);
@@ -879,11 +995,17 @@ public class ContainerBalancer implements SCMService {
   }
 
   /**
-   * Checks if ContainerBalancer can start.
-   * @throws IllegalContainerBalancerStateException if ContainerBalancer is
-   * already running, SCM is in safe mode, or SCM is not leader ready
+   * Validates balancer's state based on the specified expectedRunning.
+   * Confirms SCM is leader-ready and out of safe mode.
+   *
+   * @param expectedRunning true if ContainerBalancer is expected to be
+   *                        running, else false
+   * @throws IllegalContainerBalancerStateException if SCM is not
+   * leader-ready, is in safe mode, or state does not match the specified
+   * expected state
    */
-  private void validateState() throws IllegalContainerBalancerStateException {
+  private void validateState(boolean expectedRunning)
+      throws IllegalContainerBalancerStateException {
     if (!scmContext.isLeaderReady()) {
       LOG.warn("SCM is not leader ready");
       throw new IllegalContainerBalancerStateException("SCM is not leader " +
@@ -895,10 +1017,10 @@ public class ContainerBalancer implements SCMService {
     }
     lock.lock();
     try {
-      if (isBalancerRunning() || currentBalancingThread != null) {
-        LOG.warn("Cannot start ContainerBalancer because it's already running");
+      if (isBalancerRunning() != expectedRunning) {
         throw new IllegalContainerBalancerStateException(
-            "Cannot start ContainerBalancer because it's already running");
+            "Expect ContainerBalancer running state to be " + expectedRunning +
+                ", but running state is actually " + isBalancerRunning());
       }
     } finally {
       lock.unlock();
@@ -910,20 +1032,49 @@ public class ContainerBalancer implements SCMService {
    */
   @Override
   public void stop() {
-    stopBalancer();
+    lock.lock();
+    try {
+      if (!isBalancerRunning()) {
+        LOG.warn("Cannot stop Container Balancer because it's not running");
+        return;
+      }
+      stopBalancingThread();
+    } finally {
+      lock.unlock();
+    }
   }
 
   /**
    * Stops ContainerBalancer gracefully.
    */
-  public void stopBalancer() {
+  public void stopBalancer()
+      throws IOException, IllegalContainerBalancerStateException {
     lock.lock();
     try {
-      if (!isBalancerRunning()) {
-        LOG.info("Container Balancer is not running.");
-        return;
-      }
-      stopBalancingThread();
+      // should be leader, out of safe mode, and currently running
+      validateState(true);
+      saveConfiguration(config, false, 0);
+      stop();
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Tries to stop ContainerBalancer. Logs the reason for stopping. Calls
+   * {@link ContainerBalancer#stopBalancer()}.
+   * @param stopReason a string specifying the reason for stopping
+   *                   ContainerBalancer.
+   */
+  private void tryStopBalancer(String stopReason) {
+    lock.lock();
+    try {
+      LOG.info("Stopping ContainerBalancer. Reason for stopping: {}",
+          stopReason);
+      stopBalancer();
+    } catch (IllegalContainerBalancerStateException | IOException e) {
+      LOG.warn("Tried to stop ContainerBalancer but failed. Reason for " +
+          "stopping: {}", stopReason, e);
     } finally {
       lock.unlock();
     }
@@ -933,7 +1084,6 @@ public class ContainerBalancer implements SCMService {
     Thread balancingThread;
     lock.lock();
     try {
-      balancerRunning = false;
       balancingThread = currentBalancingThread;
       currentBalancingThread = null;
     } finally {
@@ -952,6 +1102,20 @@ public class ContainerBalancer implements SCMService {
     LOG.info("Container Balancer stopped successfully.");
   }
 
+  private void saveConfiguration(ContainerBalancerConfiguration configuration,
+                                 boolean shouldRun, int index)
+      throws IOException {
+    lock.lock();
+    try {
+      saveConfiguration(configuration.toProtobufBuilder()
+          .setShouldRun(shouldRun)
+          .setNextIterationIndex(index)
+          .build());
+    } finally {
+      lock.unlock();
+    }
+  }
+
   private void validateConfiguration(ContainerBalancerConfiguration conf)
       throws InvalidContainerBalancerConfigurationException {
     // maxSizeEnteringTarget and maxSizeLeavingSource should by default be
@@ -1007,12 +1171,24 @@ public class ContainerBalancer implements SCMService {
   }
 
   /**
-   * Sets the configuration that ContainerBalancer will use. This should be
-   * set before starting balancer.
-   * @param config ContainerBalancerConfiguration
+   * Persists the configuration that ContainerBalancer will use after
+   * validating state and the specified configuration.
+   * @param configuration ContainerBalancerConfiguration to persist
+   * @throws InvalidContainerBalancerConfigurationException on failure to
+   * validate the specified configuration
+   * @throws IllegalContainerBalancerStateException if this SCM is not leader
+   * or not out of safe mode or if ContainerBalancer is currently running in
+   * this SCM
+   * @throws IOException on failure to persist configuration
    */
-  public void setConfig(ContainerBalancerConfiguration config) {
-    this.config = config;
+  private void setBalancerConfigOnStartBalancer(
+      ContainerBalancerConfiguration configuration)
+      throws InvalidContainerBalancerConfigurationException,
+      IllegalContainerBalancerStateException, IOException {
+    validateState(false);
+    validateConfiguration(configuration);
+    saveConfiguration(configuration, true, 0);
+    this.config = configuration;
   }
 
   /**
@@ -1060,15 +1236,6 @@ public class ContainerBalancer implements SCMService {
     return sourceToTargetMap;
   }
 
-  /**
-   * Checks if ContainerBalancer is currently running.
-   *
-   * @return true if ContainerBalancer is running, false if not running.
-   */
-  public boolean isBalancerRunning() {
-    return balancerRunning;
-  }
-
   @VisibleForTesting
   int getCountDatanodesInvolvedPerIteration() {
     return countDatanodesInvolvedPerIteration;
@@ -1096,7 +1263,7 @@ public class ContainerBalancer implements SCMService {
   public String toString() {
     String status = String.format("%nContainer Balancer status:%n" +
         "%-30s %s%n" +
-        "%-30s %b%n", "Key", "Value", "Running", balancerRunning);
+        "%-30s %b%n", "Key", "Value", "Running", isBalancerRunning());
     return status + config.toString();
   }
 
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerConfiguration.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerConfiguration.java
index 4e994c8a38..9d5083768c 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerConfiguration.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerConfiguration.java
@@ -22,8 +22,11 @@ import org.apache.hadoop.hdds.conf.Config;
 import org.apache.hadoop.hdds.conf.ConfigGroup;
 import org.apache.hadoop.hdds.conf.ConfigTag;
 import org.apache.hadoop.hdds.conf.ConfigType;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ContainerBalancerConfigurationProto;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.ozone.OzoneConsts;
+import org.jetbrains.annotations.NotNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -209,6 +212,10 @@ public final class ContainerBalancerConfiguration {
     return triggerDuEnable;
   }
 
+  public void setTriggerDuEnable(boolean enable) {
+    triggerDuEnable = enable;
+  }
+
   /**
    * Set the NetworkTopologyEnable value for Container Balancer.
    *
@@ -315,6 +322,10 @@ public final class ContainerBalancerConfiguration {
     this.moveTimeout = duration.toMillis();
   }
 
+  public void setMoveTimeout(long millis) {
+    this.moveTimeout = millis;
+  }
+
   public Duration getBalancingInterval() {
     return Duration.ofMillis(balancingInterval);
   }
@@ -323,6 +334,10 @@ public final class ContainerBalancerConfiguration {
     this.balancingInterval = balancingInterval.toMillis();
   }
 
+  public void setBalancingInterval(long millis) {
+    this.balancingInterval = millis;
+  }
+
   /**
    * Gets a set of datanode hostnames or ip addresses that will be the exclusive
    * participants in balancing.
@@ -390,4 +405,73 @@ public final class ContainerBalancerConfiguration {
         "Max Size Leaving Source per Iteration",
         maxSizeLeavingSource / OzoneConsts.GB);
   }
+
+  ContainerBalancerConfigurationProto.Builder toProtobufBuilder() {
+    ContainerBalancerConfigurationProto.Builder builder =
+        ContainerBalancerConfigurationProto.newBuilder();
+
+    builder.setUtilizationThreshold(threshold)
+        .setDatanodesInvolvedMaxPercentagePerIteration(
+            maxDatanodesPercentageToInvolvePerIteration)
+        .setSizeMovedMaxPerIteration(maxSizeToMovePerIteration)
+        .setSizeEnteringTargetMax(maxSizeEnteringTarget)
+        .setSizeLeavingSourceMax(maxSizeLeavingSource)
+        .setIterations(iterations)
+        .setExcludeContainers(excludeContainers)
+        .setMoveTimeout(moveTimeout)
+        .setBalancingIterationInterval(balancingInterval)
+        .setIncludeDatanodes(includeNodes)
+        .setExcludeDatanodes(excludeNodes)
+        .setMoveNetworkTopologyEnable(networkTopologyEnable)
+        .setTriggerDuBeforeMoveEnable(triggerDuEnable);
+    return builder;
+  }
+
+  static ContainerBalancerConfiguration fromProtobuf(
+      @NotNull ContainerBalancerConfigurationProto proto,
+      @NotNull OzoneConfiguration ozoneConfiguration) {
+    ContainerBalancerConfiguration config =
+        ozoneConfiguration.getObject(ContainerBalancerConfiguration.class);
+    if (proto.hasUtilizationThreshold()) {
+      config.setThreshold(Double.parseDouble(proto.getUtilizationThreshold()));
+    }
+    if (proto.hasDatanodesInvolvedMaxPercentagePerIteration()) {
+      config.setMaxDatanodesPercentageToInvolvePerIteration(
+          proto.getDatanodesInvolvedMaxPercentagePerIteration());
+    }
+    if (proto.hasSizeMovedMaxPerIteration()) {
+      config.setMaxSizeToMovePerIteration(proto.getSizeMovedMaxPerIteration());
+    }
+    if (proto.hasSizeEnteringTargetMax()) {
+      config.setMaxSizeEnteringTarget(proto.getSizeEnteringTargetMax());
+    }
+    if (proto.hasSizeLeavingSourceMax()) {
+      config.setMaxSizeLeavingSource(proto.getSizeLeavingSourceMax());
+    }
+    if (proto.hasIterations()) {
+      config.setIterations(proto.getIterations());
+    }
+    if (proto.hasExcludeContainers()) {
+      config.setExcludeContainers(proto.getExcludeContainers());
+    }
+    if (proto.hasMoveTimeout()) {
+      config.setMoveTimeout(proto.getMoveTimeout());
+    }
+    if (proto.hasBalancingIterationInterval()) {
+      config.setBalancingInterval(proto.getBalancingIterationInterval());
+    }
+    if (proto.hasIncludeDatanodes()) {
+      config.setIncludeNodes(proto.getIncludeDatanodes());
+    }
+    if (proto.hasExcludeDatanodes()) {
+      config.setExcludeNodes(proto.getExcludeDatanodes());
+    }
+    if (proto.hasMoveNetworkTopologyEnable()) {
+      config.setNetworkTopologyEnable(proto.getMoveNetworkTopologyEnable());
+    }
+    if (proto.hasTriggerDuBeforeMoveEnable()) {
+      config.setTriggerDuEnable(proto.getTriggerDuBeforeMoveEnable());
+    }
+    return config;
+  }
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/IllegalContainerBalancerStateException.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/IllegalContainerBalancerStateException.java
index cc938e636f..b061ddf7c9 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/IllegalContainerBalancerStateException.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/IllegalContainerBalancerStateException.java
@@ -18,10 +18,13 @@
 
 package org.apache.hadoop.hdds.scm.container.balancer;
 
+import org.apache.hadoop.hdds.scm.ha.SCMServiceException;
+
 /**
  * Signals that a state change cannot be performed on ContainerBalancer.
  */
-public class IllegalContainerBalancerStateException extends Exception {
+public class IllegalContainerBalancerStateException extends
+    SCMServiceException {
 
   /**
    * Constructs an IllegalContainerBalancerStateException with no detail
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/InvalidContainerBalancerConfigurationException.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/InvalidContainerBalancerConfigurationException.java
index c6a6bf030b..9a4cc86ca4 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/InvalidContainerBalancerConfigurationException.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/InvalidContainerBalancerConfigurationException.java
@@ -18,11 +18,16 @@
 
 package org.apache.hadoop.hdds.scm.container.balancer;
 
+import org.apache.hadoop.hdds.scm.ha.SCMServiceException;
+
+import java.io.IOException;
+
 /**
  * Signals that {@link ContainerBalancerConfiguration} contains invalid
  * configuration value(s).
  */
-public class InvalidContainerBalancerConfigurationException extends Exception {
+public class InvalidContainerBalancerConfigurationException extends
+    SCMServiceException {
 
   /**
    * Constructs an InvalidContainerBalancerConfigurationException with no detail
@@ -44,4 +49,8 @@ public class InvalidContainerBalancerConfigurationException extends Exception {
     super(s);
   }
 
+  public InvalidContainerBalancerConfigurationException(String s,
+                                                        IOException e) {
+    super(s, e);
+  }
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMService.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMService.java
index 4d7c435b1e..2b185c9e4e 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMService.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMService.java
@@ -66,7 +66,7 @@ public interface SCMService {
   /**
    * starts the SCM service.
    */
-  void start();
+  void start() throws SCMServiceException;
 
   /**
    * stops the SCM service.
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMServiceException.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMServiceException.java
new file mode 100644
index 0000000000..72fb7d25d2
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMServiceException.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.hadoop.hdds.scm.ha;
+
+/**
+ * Checked exceptions thrown by an {@link SCMService}.
+ */
+public class SCMServiceException extends Exception {
+
+  /**
+   * Constructs a new exception with {@code null} as its detail message.
+   */
+  public SCMServiceException() {
+    super();
+  }
+
+  public SCMServiceException(String s) {
+    super(s);
+  }
+
+  public SCMServiceException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  public SCMServiceException(Throwable cause) {
+    super(cause);
+  }
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMServiceManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMServiceManager.java
index 1b75c4feed..2ab8f8ea24 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMServiceManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMServiceManager.java
@@ -70,7 +70,11 @@ public final class SCMServiceManager {
   public synchronized void start() {
     for (SCMService service : services) {
       LOG.debug("Stopping service:{}.", service.getServiceName());
-      service.start();
+      try {
+        service.start();
+      } catch (SCMServiceException e) {
+        LOG.warn("Could not start " + service.getServiceName(), e);
+      }
     }
   }
 
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/StatefulService.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/StatefulService.java
index 441e83ce13..69df7c740f 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/StatefulService.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/StatefulService.java
@@ -20,8 +20,6 @@ package org.apache.hadoop.hdds.scm.ha;
 
 import com.google.protobuf.ByteString;
 import com.google.protobuf.GeneratedMessage;
-import com.google.protobuf.InvalidProtocolBufferException;
-import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
@@ -35,10 +33,11 @@ public abstract class StatefulService implements SCMService {
 
   /**
    * Initialize a StatefulService from an extending class.
-   * @param scm {@link StorageContainerManager}
+   * @param stateManager a reference to the
+   * {@link StatefulServiceStateManager} from SCM.
    */
-  protected StatefulService(StorageContainerManager scm) {
-    stateManager = scm.getStatefulServiceStateManager();
+  protected StatefulService(StatefulServiceStateManager stateManager) {
+    this.stateManager = stateManager;
   }
 
   /**
@@ -60,21 +59,27 @@ public abstract class StatefulService implements SCMService {
    *
    * @param configType the Class object of the protobuf message type
    * @param <T>        the Type of the protobuf message
-   * @return persisted protobuf message
+   * @return persisted protobuf message or null if the entry is not found
    * @throws IOException on failure to fetch the message from DB or when
    *                     parsing it. ensure the specified configType is correct
    */
   protected final <T extends GeneratedMessage> T readConfiguration(
       Class<T> configType) throws IOException {
+    ByteString byteString = stateManager.readConfiguration(getServiceName());
+    if (byteString == null) {
+      return null;
+    }
     try {
       return configType.cast(ReflectionUtil.getMethod(configType,
               "parseFrom", ByteString.class)
-          .invoke(null, stateManager.readConfiguration(getServiceName())));
+          .invoke(null, byteString));
     } catch (NoSuchMethodException | IllegalAccessException
         | InvocationTargetException e) {
       e.printStackTrace();
-      throw new InvalidProtocolBufferException("GeneratedMessage cannot " +
-          "be parsed for type " + configType + ": " + e.getMessage());
+      throw new IOException("GeneratedMessage cannot be parsed. Ensure that "
+          + configType + " is the correct expected message type for " +
+          this.getServiceName(), e);
     }
+
   }
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/StatefulServiceStateManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/StatefulServiceStateManagerImpl.java
index d470f1bf7f..1e7a756f41 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/StatefulServiceStateManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/StatefulServiceStateManagerImpl.java
@@ -23,6 +23,8 @@ import com.google.protobuf.ByteString;
 import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType;
 import org.apache.hadoop.hdds.scm.metadata.DBTransactionBuffer;
 import org.apache.hadoop.hdds.utils.db.Table;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.lang.reflect.Proxy;
@@ -34,6 +36,9 @@ import java.lang.reflect.Proxy;
 public final class StatefulServiceStateManagerImpl
     implements StatefulServiceStateManager {
 
+  public static final Logger LOG =
+      LoggerFactory.getLogger(StatefulServiceStateManagerImpl.class);
+
   // this table maps the service name to the configuration (ByteString)
   private Table<String, ByteString> statefulServiceConfig;
   private final DBTransactionBuffer transactionBuffer;
@@ -52,10 +57,19 @@ public final class StatefulServiceStateManagerImpl
   public void saveConfiguration(String serviceName, ByteString bytes)
       throws IOException {
     transactionBuffer.addToBuffer(statefulServiceConfig, serviceName, bytes);
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Added specified bytes to the transaction buffer for key " +
+          "{} to table {}", serviceName, statefulServiceConfig.getName());
+    }
+
     if (transactionBuffer instanceof SCMHADBTransactionBuffer) {
       SCMHADBTransactionBuffer buffer =
               (SCMHADBTransactionBuffer) transactionBuffer;
       buffer.flush();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Transaction buffer flushed");
+      }
     }
   }
 
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/IllegalContainerBalancerStateException.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/io/ByteStringCodec.java
similarity index 52%
copy from hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/IllegalContainerBalancerStateException.java
copy to hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/io/ByteStringCodec.java
index cc938e636f..e0cf00c52a 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/IllegalContainerBalancerStateException.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/io/ByteStringCodec.java
@@ -15,32 +15,25 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.hadoop.hdds.scm.ha.io;
 
-package org.apache.hadoop.hdds.scm.container.balancer;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
 
 /**
- * Signals that a state change cannot be performed on ContainerBalancer.
+ * A dummy codec that serializes a ByteString object to ByteString.
  */
-public class IllegalContainerBalancerStateException extends Exception {
+public class ByteStringCodec implements Codec {
 
-  /**
-   * Constructs an IllegalContainerBalancerStateException with no detail
-   * message. A detail message is a String that describes this particular
-   * exception.
-   */
-  public IllegalContainerBalancerStateException() {
-    super();
+  @Override
+  public ByteString serialize(Object object)
+      throws InvalidProtocolBufferException {
+    return (ByteString) object;
   }
 
-  /**
-   * Constructs an IllegalContainerBalancerStateException with the specified
-   * detail message. A detail message is a String that describes this particular
-   * exception.
-   *
-   * @param s the String that contains a detailed message
-   */
-  public IllegalContainerBalancerStateException(String s) {
-    super(s);
+  @Override
+  public Object deserialize(Class<?> type, ByteString value)
+      throws InvalidProtocolBufferException {
+    return value;
   }
-
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/io/CodecFactory.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/io/CodecFactory.java
index 9fb771b7a7..dae2b3ce6a 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/io/CodecFactory.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/io/CodecFactory.java
@@ -17,6 +17,7 @@
 
 package org.apache.hadoop.hdds.scm.ha.io;
 
+import com.google.protobuf.ByteString;
 import com.google.protobuf.GeneratedMessage;
 import com.google.protobuf.InvalidProtocolBufferException;
 import com.google.protobuf.ProtocolMessageEnum;
@@ -45,6 +46,7 @@ public final class CodecFactory {
     codecs.put(Boolean.class, new BooleanCodec());
     codecs.put(BigInteger.class, new BigIntegerCodec());
     codecs.put(X509Certificate.class, new X509CertificateCodec());
+    codecs.put(ByteString.class, new ByteStringCodec());
   }
 
   private CodecFactory() { }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
index 2d38331ceb..fcee862029 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
@@ -909,10 +909,9 @@ public class SCMClientProtocolServer implements
     }
 
     ContainerBalancer containerBalancer = scm.getContainerBalancer();
-    containerBalancer.setConfig(cbc);
     try {
-      containerBalancer.startBalancer();
-    } catch (IllegalContainerBalancerStateException |
+      containerBalancer.startBalancer(cbc);
+    } catch (IllegalContainerBalancerStateException | IOException |
         InvalidContainerBalancerConfigurationException e) {
       AUDIT.logWriteFailure(buildAuditMessageForFailure(
           SCMAction.START_CONTAINER_BALANCER, null, e));
@@ -931,9 +930,14 @@ public class SCMClientProtocolServer implements
   @Override
   public void stopContainerBalancer() throws IOException {
     getScm().checkAdminAccess(getRemoteUser());
-    AUDIT.logWriteSuccess(buildAuditMessageForSuccess(
-        SCMAction.STOP_CONTAINER_BALANCER, null));
-    scm.getContainerBalancer().stopBalancer();
+    try {
+      scm.getContainerBalancer().stopBalancer();
+      AUDIT.logWriteSuccess(buildAuditMessageForSuccess(
+          SCMAction.STOP_CONTAINER_BALANCER, null));
+    } catch (IllegalContainerBalancerStateException e) {
+      AUDIT.logWriteFailure(buildAuditMessageForFailure(
+          SCMAction.STOP_CONTAINER_BALANCER, null, e));
+    }
   }
 
   @Override
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 9e651c8dfd..4440d4a67d 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -1469,10 +1469,15 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
   @Override
   public void stop() {
     try {
-      LOG.info("Stopping Container Balancer service.");
-      containerBalancer.stopBalancer();
+      if (containerBalancer.isBalancerRunning()) {
+        LOG.info("Stopping Container Balancer service.");
+        // stop ContainerBalancer thread in this scm
+        containerBalancer.stop();
+      } else {
+        LOG.info("Container Balancer is not running.");
+      }
     } catch (Exception e) {
-      LOG.error("Failed to stop Container Balancer service.");
+      LOG.error("Failed to stop Container Balancer service.", e);
     }
 
     try {
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancer.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancer.java
index 4901617f9f..98c92b2242 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancer.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancer.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hdds.scm.container.balancer;
 
+import com.google.protobuf.ByteString;
 import org.apache.hadoop.hdds.client.RatisReplicationConfig;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -28,7 +29,6 @@ import org.apache.hadoop.hdds.scm.ContainerPlacementStatus;
 import org.apache.hadoop.hdds.scm.PlacementPolicy;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
-import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
 import org.apache.hadoop.hdds.scm.container.ContainerReplica;
 import org.apache.hadoop.hdds.scm.container.ContainerManager;
 import org.apache.hadoop.hdds.scm.container.MockNodeManager;
@@ -38,8 +38,11 @@ import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
 import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicyFactory;
 import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementMetrics;
 import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
-import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.scm.ha.SCMService;
+import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
+import org.apache.hadoop.hdds.scm.ha.StatefulServiceStateManager;
+import org.apache.hadoop.hdds.scm.ha.StatefulServiceStateManagerImpl;
 import org.apache.hadoop.hdds.scm.node.DatanodeUsageInfo;
 import org.apache.hadoop.hdds.scm.node.NodeStatus;
 import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
@@ -49,14 +52,13 @@ import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.ozone.test.GenericTestUtils;
 import org.junit.Assert;
 import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
 import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.event.Level;
 
+import java.io.IOException;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -96,21 +98,24 @@ public class TestContainerBalancer {
   private Map<ContainerID, ContainerInfo> cidToInfoMap = new HashMap<>();
   private Map<DatanodeUsageInfo, Set<ContainerID>> datanodeToContainersMap =
       new HashMap<>();
+  private Map<String, ByteString> serviceToConfigMap = new HashMap<>();
   private static final ThreadLocalRandom RANDOM = ThreadLocalRandom.current();
 
-  @Rule
-  public TemporaryFolder tempFolder = new TemporaryFolder();
+  private StatefulServiceStateManager serviceStateManager;
 
   /**
    * Sets up configuration values and creates a mock cluster.
    */
   @Before
-  public void setup() throws SCMException, NodeNotFoundException {
+  public void setup() throws IOException, NodeNotFoundException {
     conf = new OzoneConfiguration();
     scm = Mockito.mock(StorageContainerManager.class);
     containerManager = Mockito.mock(ContainerManager.class);
     replicationManager = Mockito.mock(ReplicationManager.class);
+    serviceStateManager = Mockito.mock(StatefulServiceStateManagerImpl.class);
+    SCMServiceManager scmServiceManager = Mockito.mock(SCMServiceManager.class);
 
+    // these configs will usually be specified in each test
     balancerConfiguration =
         conf.getObject(ContainerBalancerConfiguration.class);
     balancerConfiguration.setThreshold(10);
@@ -161,7 +166,30 @@ public class TestContainerBalancer {
     when(scm.getClusterMap()).thenReturn(null);
     when(scm.getEventQueue()).thenReturn(mock(EventPublisher.class));
     when(scm.getConfiguration()).thenReturn(conf);
+    when(scm.getStatefulServiceStateManager()).thenReturn(serviceStateManager);
+    when(scm.getSCMServiceManager()).thenReturn(scmServiceManager);
 
+    /*
+    When StatefulServiceStateManager#saveConfiguration is called, save to
+    in-memory serviceToConfigMap instead.
+     */
+    Mockito.doAnswer(i -> {
+      serviceToConfigMap.put(i.getArgument(0, String.class), i.getArgument(1,
+          ByteString.class));
+      return null;
+    }).when(serviceStateManager).saveConfiguration(
+        Mockito.any(String.class),
+        Mockito.any(ByteString.class));
+
+    /*
+    When StatefulServiceStateManager#readConfiguration is called, read from
+    serviceToConfigMap instead.
+     */
+    when(serviceStateManager.readConfiguration(Mockito.anyString())).thenAnswer(
+        i -> serviceToConfigMap.get(i.getArgument(0, String.class)));
+
+    Mockito.doNothing().when(scmServiceManager)
+        .register(Mockito.any(SCMService.class));
     containerBalancer = new ContainerBalancer(scm);
   }
 
@@ -184,7 +212,9 @@ public class TestContainerBalancer {
    */
   @Test
   public void
-      initializeIterationShouldUpdateUnBalancedNodesWhenThresholdChanges() {
+      initializeIterationShouldUpdateUnBalancedNodesWhenThresholdChanges()
+      throws IllegalContainerBalancerStateException, IOException,
+      InvalidContainerBalancerConfigurationException {
     List<DatanodeUsageInfo> expectedUnBalancedNodes;
     List<DatanodeUsageInfo> unBalancedNodesAccordingToBalancer;
 
@@ -207,7 +237,7 @@ public class TestContainerBalancer {
       unBalancedNodesAccordingToBalancer =
           containerBalancer.getUnBalancedNodes();
 
-      containerBalancer.stopBalancer();
+      stopBalancer();
       Assert.assertEquals(
           expectedUnBalancedNodes.size(),
           unBalancedNodesAccordingToBalancer.size());
@@ -224,13 +254,15 @@ public class TestContainerBalancer {
    * balanced.
    */
   @Test
-  public void unBalancedNodesListShouldBeEmptyWhenClusterIsBalanced() {
+  public void unBalancedNodesListShouldBeEmptyWhenClusterIsBalanced()
+      throws IllegalContainerBalancerStateException, IOException,
+      InvalidContainerBalancerConfigurationException {
     balancerConfiguration.setThreshold(99.99);
     startBalancer(balancerConfiguration);
 
     sleepWhileBalancing(100);
 
-    containerBalancer.stopBalancer();
+    stopBalancer();
     ContainerBalancerMetrics metrics = containerBalancer.getMetrics();
     Assert.assertEquals(0, containerBalancer.getUnBalancedNodes().size());
     Assert.assertEquals(0, metrics.getNumDatanodesUnbalanced());
@@ -241,7 +273,9 @@ public class TestContainerBalancer {
    * maxDatanodesRatioToInvolvePerIteration limit.
    */
   @Test
-  public void containerBalancerShouldObeyMaxDatanodesToInvolveLimit() {
+  public void containerBalancerShouldObeyMaxDatanodesToInvolveLimit()
+      throws IllegalContainerBalancerStateException, IOException,
+      InvalidContainerBalancerConfigurationException {
     int percent = 20;
     balancerConfiguration.setMaxDatanodesPercentageToInvolvePerIteration(
         percent);
@@ -259,11 +293,13 @@ public class TestContainerBalancer {
     Assert.assertTrue(metrics.getNumDatanodesInvolvedInLatestIteration() > 0);
     Assert.assertFalse(
         metrics.getNumDatanodesInvolvedInLatestIteration() > number);
-    containerBalancer.stopBalancer();
+    stopBalancer();
   }
 
   @Test
-  public void containerBalancerShouldSelectOnlyClosedContainers() {
+  public void containerBalancerShouldSelectOnlyClosedContainers()
+      throws IllegalContainerBalancerStateException, IOException,
+      InvalidContainerBalancerConfigurationException {
     // make all containers open, balancer should not select any of them
     for (ContainerInfo containerInfo : cidToInfoMap.values()) {
       containerInfo.setState(HddsProtos.LifeCycleState.OPEN);
@@ -271,7 +307,7 @@ public class TestContainerBalancer {
     balancerConfiguration.setThreshold(10);
     startBalancer(balancerConfiguration);
     sleepWhileBalancing(500);
-    containerBalancer.stopBalancer();
+    stopBalancer();
 
     // balancer should have identified unbalanced nodes
     Assert.assertFalse(containerBalancer.getUnBalancedNodes().isEmpty());
@@ -291,7 +327,7 @@ public class TestContainerBalancer {
     }
     startBalancer(balancerConfiguration);
     sleepWhileBalancing(500);
-    containerBalancer.stopBalancer();
+    stopBalancer();
 
     // check whether all selected containers are closed
     for (ContainerMoveSelection moveSelection:
@@ -303,7 +339,9 @@ public class TestContainerBalancer {
   }
 
   @Test
-  public void containerBalancerShouldObeyMaxSizeToMoveLimit() {
+  public void containerBalancerShouldObeyMaxSizeToMoveLimit()
+      throws IllegalContainerBalancerStateException, IOException,
+      InvalidContainerBalancerConfigurationException {
     balancerConfiguration.setThreshold(1);
     balancerConfiguration.setMaxSizeToMovePerIteration(10 * OzoneConsts.GB);
     balancerConfiguration.setIterations(1);
@@ -319,11 +357,13 @@ public class TestContainerBalancer {
         .getDataSizeMovedGBInLatestIteration();
     Assert.assertTrue(size > 0);
     Assert.assertFalse(size > 10);
-    containerBalancer.stopBalancer();
+    stopBalancer();
   }
 
   @Test
-  public void targetDatanodeShouldNotAlreadyContainSelectedContainer() {
+  public void targetDatanodeShouldNotAlreadyContainSelectedContainer()
+      throws IllegalContainerBalancerStateException, IOException,
+      InvalidContainerBalancerConfigurationException {
     balancerConfiguration.setThreshold(10);
     balancerConfiguration.setMaxSizeToMovePerIteration(100 * OzoneConsts.GB);
     balancerConfiguration.setMaxDatanodesPercentageToInvolvePerIteration(100);
@@ -336,7 +376,7 @@ public class TestContainerBalancer {
       Thread.sleep(1000);
     } catch (InterruptedException e) { }
 
-    containerBalancer.stopBalancer();
+    stopBalancer();
     Map<DatanodeDetails, ContainerMoveSelection> sourceToTargetMap =
         containerBalancer.getSourceToTargetMap();
     for (ContainerMoveSelection moveSelection : sourceToTargetMap.values()) {
@@ -350,7 +390,9 @@ public class TestContainerBalancer {
   }
 
   @Test
-  public void containerMoveSelectionShouldFollowPlacementPolicy() {
+  public void containerMoveSelectionShouldFollowPlacementPolicy()
+      throws IllegalContainerBalancerStateException, IOException,
+      InvalidContainerBalancerConfigurationException {
     balancerConfiguration.setThreshold(10);
     balancerConfiguration.setMaxSizeToMovePerIteration(50 * OzoneConsts.GB);
     balancerConfiguration.setMaxDatanodesPercentageToInvolvePerIteration(100);
@@ -363,7 +405,7 @@ public class TestContainerBalancer {
       Thread.sleep(1000);
     } catch (InterruptedException e) { }
 
-    containerBalancer.stopBalancer();
+    stopBalancer();
     Map<DatanodeDetails, ContainerMoveSelection> sourceToTargetMap =
         containerBalancer.getSourceToTargetMap();
 
@@ -392,7 +434,8 @@ public class TestContainerBalancer {
 
   @Test
   public void targetDatanodeShouldBeInServiceHealthy()
-      throws NodeNotFoundException {
+      throws NodeNotFoundException, IllegalContainerBalancerStateException,
+      IOException, InvalidContainerBalancerConfigurationException {
     balancerConfiguration.setThreshold(10);
     balancerConfiguration.setMaxDatanodesPercentageToInvolvePerIteration(100);
     balancerConfiguration.setMaxSizeToMovePerIteration(50 * OzoneConsts.GB);
@@ -407,7 +450,7 @@ public class TestContainerBalancer {
     } catch (InterruptedException e) {
     }
 
-    containerBalancer.stopBalancer();
+    stopBalancer();
     for (ContainerMoveSelection moveSelection :
         containerBalancer.getSourceToTargetMap().values()) {
       DatanodeDetails target = moveSelection.getTargetNode();
@@ -419,7 +462,9 @@ public class TestContainerBalancer {
   }
 
   @Test
-  public void selectedContainerShouldNotAlreadyHaveBeenSelected() {
+  public void selectedContainerShouldNotAlreadyHaveBeenSelected()
+      throws IllegalContainerBalancerStateException, IOException,
+      InvalidContainerBalancerConfigurationException {
     balancerConfiguration.setThreshold(10);
     balancerConfiguration.setMaxDatanodesPercentageToInvolvePerIteration(100);
     balancerConfiguration.setMaxSizeToMovePerIteration(50 * OzoneConsts.GB);
@@ -434,7 +479,7 @@ public class TestContainerBalancer {
       Thread.sleep(1000);
     } catch (InterruptedException e) { }
 
-    containerBalancer.stopBalancer();
+    stopBalancer();
     Set<ContainerID> containers = new HashSet<>();
     for (ContainerMoveSelection moveSelection :
         containerBalancer.getSourceToTargetMap().values()) {
@@ -445,7 +490,9 @@ public class TestContainerBalancer {
   }
 
   @Test
-  public void balancerShouldNotSelectConfiguredExcludeContainers() {
+  public void balancerShouldNotSelectConfiguredExcludeContainers()
+      throws IllegalContainerBalancerStateException, IOException,
+      InvalidContainerBalancerConfigurationException {
     balancerConfiguration.setThreshold(10);
     balancerConfiguration.setMaxDatanodesPercentageToInvolvePerIteration(100);
     balancerConfiguration.setMaxSizeToMovePerIteration(50 * OzoneConsts.GB);
@@ -461,7 +508,7 @@ public class TestContainerBalancer {
       Thread.sleep(1000);
     } catch (InterruptedException e) { }
 
-    containerBalancer.stopBalancer();
+    stopBalancer();
     Set<ContainerID> excludeContainers =
         balancerConfiguration.getExcludeContainers();
     for (ContainerMoveSelection moveSelection :
@@ -472,7 +519,9 @@ public class TestContainerBalancer {
   }
 
   @Test
-  public void balancerShouldObeyMaxSizeEnteringTargetLimit() {
+  public void balancerShouldObeyMaxSizeEnteringTargetLimit()
+      throws IllegalContainerBalancerStateException, IOException,
+      InvalidContainerBalancerConfigurationException {
     conf.set("ozone.scm.container.size", "1MB");
     balancerConfiguration =
         conf.getObject(ContainerBalancerConfiguration.class);
@@ -487,7 +536,7 @@ public class TestContainerBalancer {
 
     Assert.assertFalse(containerBalancer.getUnBalancedNodes().isEmpty());
     Assert.assertTrue(containerBalancer.getSourceToTargetMap().isEmpty());
-    containerBalancer.stopBalancer();
+    stopBalancer();
 
     // some containers should be selected when using default values
     OzoneConfiguration ozoneConfiguration = new OzoneConfiguration();
@@ -497,26 +546,28 @@ public class TestContainerBalancer {
 
     sleepWhileBalancing(500);
 
-    containerBalancer.stopBalancer();
+    stopBalancer();
     // balancer should have identified unbalanced nodes
     Assert.assertFalse(containerBalancer.getUnBalancedNodes().isEmpty());
     Assert.assertFalse(containerBalancer.getSourceToTargetMap().isEmpty());
   }
 
   @Test
-  public void testMetrics() {
+  public void testMetrics()
+      throws IllegalContainerBalancerStateException, IOException,
+      InvalidContainerBalancerConfigurationException {
     conf.set("hdds.datanode.du.refresh.period", "1ms");
     balancerConfiguration.setBalancingInterval(Duration.ofMillis(2));
     balancerConfiguration.setThreshold(10);
     balancerConfiguration.setIterations(1);
     balancerConfiguration.setMaxSizeEnteringTarget(6 * OzoneConsts.GB);
-    // deliberately set max size per iteration to a low value, 6GB
+    // deliberately set max size per iteration to a low value, 6 GB
     balancerConfiguration.setMaxSizeToMovePerIteration(6 * OzoneConsts.GB);
     balancerConfiguration.setMaxDatanodesPercentageToInvolvePerIteration(100);
 
     startBalancer(balancerConfiguration);
     sleepWhileBalancing(500);
-    containerBalancer.stopBalancer();
+    stopBalancer();
 
     ContainerBalancerMetrics metrics = containerBalancer.getMetrics();
     Assert.assertEquals(determineExpectedUnBalancedNodes(
@@ -535,7 +586,9 @@ public class TestContainerBalancer {
    * exclude configurations, then it should be excluded.
    */
   @Test
-  public void balancerShouldFollowExcludeAndIncludeDatanodesConfigurations() {
+  public void balancerShouldFollowExcludeAndIncludeDatanodesConfigurations()
+      throws IllegalContainerBalancerStateException, IOException,
+      InvalidContainerBalancerConfigurationException {
     balancerConfiguration.setThreshold(10);
     balancerConfiguration.setIterations(1);
     balancerConfiguration.setMaxSizeEnteringTarget(10 * OzoneConsts.GB);
@@ -569,7 +622,7 @@ public class TestContainerBalancer {
     balancerConfiguration.setIncludeNodes(includeNodes);
     startBalancer(balancerConfiguration);
     sleepWhileBalancing(500);
-    containerBalancer.stopBalancer();
+    stopBalancer();
 
     // finally, these should be the only nodes included in balancing
     // (included - excluded)
@@ -606,7 +659,9 @@ public class TestContainerBalancer {
 
   @Test
   public void checkIterationResult()
-      throws NodeNotFoundException, ContainerNotFoundException {
+      throws NodeNotFoundException, IOException,
+      IllegalContainerBalancerStateException,
+      InvalidContainerBalancerConfigurationException {
     balancerConfiguration.setThreshold(10);
     balancerConfiguration.setIterations(1);
     balancerConfiguration.setMaxSizeEnteringTarget(10 * OzoneConsts.GB);
@@ -622,7 +677,7 @@ public class TestContainerBalancer {
      */
     Assert.assertEquals(ContainerBalancer.IterationResult.ITERATION_COMPLETED,
         containerBalancer.getIterationResult());
-    containerBalancer.stop();
+    stopBalancer();
 
     /*
     Now, limit maxSizeToMovePerIteration but fail all container moves. The
@@ -640,12 +695,14 @@ public class TestContainerBalancer {
 
     Assert.assertEquals(ContainerBalancer.IterationResult.ITERATION_COMPLETED,
         containerBalancer.getIterationResult());
-    containerBalancer.stop();
+    stopBalancer();
   }
 
   @Test
   public void checkIterationResultTimeout()
-      throws NodeNotFoundException, ContainerNotFoundException {
+      throws NodeNotFoundException, IOException,
+      IllegalContainerBalancerStateException,
+      InvalidContainerBalancerConfigurationException {
 
     Mockito.when(replicationManager.move(Mockito.any(ContainerID.class),
             Mockito.any(DatanodeDetails.class),
@@ -673,7 +730,7 @@ public class TestContainerBalancer {
             .getNumContainerMovesCompletedInLatestIteration());
     Assert.assertTrue(containerBalancer.getMetrics()
             .getNumContainerMovesTimeoutInLatestIteration() > 1);
-    containerBalancer.stop();
+    stopBalancer();
 
   }
 
@@ -859,13 +916,19 @@ public class TestContainerBalancer {
     }
   }
 
-  private void startBalancer(ContainerBalancerConfiguration config) {
-    containerBalancer.setConfig(config);
+  private void startBalancer(ContainerBalancerConfiguration config)
+      throws IllegalContainerBalancerStateException, IOException,
+      InvalidContainerBalancerConfigurationException {
+    containerBalancer.startBalancer(config);
+  }
+
+  private void stopBalancer() {
     try {
-      containerBalancer.startBalancer();
-    } catch (IllegalContainerBalancerStateException |
-        InvalidContainerBalancerConfigurationException e) {
-      LOG.info("Could not start ContainerBalancer while testing", e);
+      if (containerBalancer.isBalancerRunning()) {
+        containerBalancer.stopBalancer();
+      }
+    } catch (IOException | IllegalContainerBalancerStateException e) {
+      LOG.warn("Failed to stop balancer", e);
     }
   }
 
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestFailoverWithSCMHA.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestFailoverWithSCMHA.java
index b4882ac980..906b2aaf70 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestFailoverWithSCMHA.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestFailoverWithSCMHA.java
@@ -16,10 +16,17 @@
  */
 package org.apache.hadoop.ozone.scm;
 
+import com.google.protobuf.ByteString;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.cli.ContainerOperationClient;
+import org.apache.hadoop.hdds.scm.client.ScmClient;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.balancer.ContainerBalancer;
+import org.apache.hadoop.hdds.scm.container.balancer.ContainerBalancerConfiguration;
+import org.apache.hadoop.hdds.scm.container.balancer.IllegalContainerBalancerStateException;
+import org.apache.hadoop.hdds.scm.container.balancer.InvalidContainerBalancerConfigurationException;
 import org.apache.hadoop.hdds.scm.container.common.helpers.MoveDataNodePair;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
@@ -36,6 +43,7 @@ import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl;
 import org.apache.ozone.test.GenericTestUtils;
 import org.junit.Assert;
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -44,7 +52,9 @@ import org.slf4j.event.Level;
 import java.io.IOException;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.TimeoutException;
 
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ContainerBalancerConfigurationProto;
 import static org.apache.hadoop.hdds.scm.HddsTestUtils.getContainer;
 import static org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails;
 
@@ -226,6 +236,89 @@ public class TestFailoverWithSCMHA {
     Assert.assertFalse(inflightMove.containsKey(id));
   }
 
+  /**
+   * Starts ContainerBalancer when the cluster is already balanced.
+   * ContainerBalancer will identify that no unbalanced nodes are present and
+   * exit and stop in the first iteration. We test that ContainerBalancer
+   * persists ContainerBalancerConfigurationProto#shouldRun as false in all
+   * the 3 SCMs when it stops.
+   * @throws IOException
+   * @throws IllegalContainerBalancerStateException
+   * @throws InvalidContainerBalancerConfigurationException
+   * @throws InterruptedException
+   * @throws TimeoutException
+   */
+  @Test
+  public void testContainerBalancerPersistsConfigurationInAllSCMs()
+      throws IOException, IllegalContainerBalancerStateException,
+      InvalidContainerBalancerConfigurationException, InterruptedException,
+      TimeoutException {
+    SCMClientConfig scmClientConfig =
+        conf.getObject(SCMClientConfig.class);
+    scmClientConfig.setRetryInterval(100);
+    scmClientConfig.setMaxRetryTimeout(1500);
+    Assertions.assertEquals(15, scmClientConfig.getRetryCount());
+    conf.setFromObject(scmClientConfig);
+    StorageContainerManager leader = getLeader(cluster);
+    Assertions.assertNotNull(leader);
+
+    ScmClient scmClient = new ContainerOperationClient(conf);
+    // assert that container balancer is not running right now
+    Assertions.assertFalse(scmClient.getContainerBalancerStatus());
+    ContainerBalancerConfiguration balancerConf =
+        conf.getObject(ContainerBalancerConfiguration.class);
+    ContainerBalancer containerBalancer = leader.getContainerBalancer();
+
+    /*
+    Start container balancer. Since this cluster is already balanced,
+    container balancer should exit early, stop, and persist configuration to DB.
+     */
+    containerBalancer.startBalancer(balancerConf);
+
+    // assert that balancer has stopped since the cluster is already balanced
+    GenericTestUtils.waitFor(() -> !containerBalancer.isBalancerRunning(),
+        10, 500);
+    Assertions.assertFalse(containerBalancer.isBalancerRunning());
+
+    ByteString byteString =
+        leader.getScmMetadataStore().getStatefulServiceConfigTable().get(
+            containerBalancer.getServiceName());
+    ContainerBalancerConfigurationProto proto =
+        ContainerBalancerConfigurationProto.parseFrom(byteString);
+    GenericTestUtils.waitFor(() -> !proto.getShouldRun(), 5, 50);
+
+    long leaderTermIndex =
+        leader.getScmHAManager().getRatisServer().getSCMStateMachine()
+        .getLastAppliedTermIndex().getIndex();
+
+    /*
+    Fetch persisted configuration to verify that `shouldRun` is set to false.
+     */
+    for (StorageContainerManager scm : cluster.getStorageContainerManagers()) {
+      if (!scm.checkLeader()) {
+        // Wait and retry for follower to update transactions to leader
+        // snapshot index.
+        // Timeout error if follower does not load update within 3s
+        GenericTestUtils.waitFor(() -> scm.getScmHAManager().getRatisServer()
+            .getSCMStateMachine().getLastAppliedTermIndex()
+            .getIndex() >= leaderTermIndex, 100, 3000);
+        ContainerBalancer followerBalancer = scm.getContainerBalancer();
+        GenericTestUtils.waitFor(
+            () -> !followerBalancer.isBalancerRunning(), 50, 5000);
+        GenericTestUtils.waitFor(() -> !followerBalancer.shouldRun(), 100,
+            5000);
+      }
+      scm.getStatefulServiceStateManager().readConfiguration(
+          containerBalancer.getServiceName());
+      byteString =
+          scm.getScmMetadataStore().getStatefulServiceConfigTable().get(
+              containerBalancer.getServiceName());
+      ContainerBalancerConfigurationProto protobuf =
+          ContainerBalancerConfigurationProto.parseFrom(byteString);
+      Assertions.assertFalse(protobuf.getShouldRun());
+    }
+  }
+
   static StorageContainerManager getLeader(MiniOzoneHAClusterImpl impl) {
     for (StorageContainerManager scm : impl.getStorageContainerManagers()) {
       if (scm.checkLeader()) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org