You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by si...@apache.org on 2022/06/01 12:19:45 UTC
[ozone] branch master updated: HDDS-6280. Support Container Balancer HA (#3423)
This is an automated email from the ASF dual-hosted git repository.
siddhant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new e8abd0f6e2 HDDS-6280. Support Container Balancer HA (#3423)
e8abd0f6e2 is described below
commit e8abd0f6e21dedf25672aaa6a2d638b8442680f0
Author: Siddhant Sangwan <si...@gmail.com>
AuthorDate: Wed Jun 1 17:49:40 2022 +0530
HDDS-6280. Support Container Balancer HA (#3423)
---
.../interface-client/src/main/proto/hdds.proto | 19 ++
.../scm/container/balancer/ContainerBalancer.java | 291 ++++++++++++++++-----
.../balancer/ContainerBalancerConfiguration.java | 84 ++++++
.../IllegalContainerBalancerStateException.java | 5 +-
...lidContainerBalancerConfigurationException.java | 11 +-
.../org/apache/hadoop/hdds/scm/ha/SCMService.java | 2 +-
.../hadoop/hdds/scm/ha/SCMServiceException.java | 44 ++++
.../hadoop/hdds/scm/ha/SCMServiceManager.java | 6 +-
.../apache/hadoop/hdds/scm/ha/StatefulService.java | 23 +-
.../scm/ha/StatefulServiceStateManagerImpl.java | 14 +
.../io/ByteStringCodec.java} | 33 +--
.../apache/hadoop/hdds/scm/ha/io/CodecFactory.java | 2 +
.../hdds/scm/server/SCMClientProtocolServer.java | 16 +-
.../hdds/scm/server/StorageContainerManager.java | 11 +-
.../container/balancer/TestContainerBalancer.java | 157 +++++++----
.../hadoop/ozone/scm/TestFailoverWithSCMHA.java | 93 +++++++
16 files changed, 660 insertions(+), 151 deletions(-)
diff --git a/hadoop-hdds/interface-client/src/main/proto/hdds.proto b/hadoop-hdds/interface-client/src/main/proto/hdds.proto
index c99274da75..01597a7b0e 100644
--- a/hadoop-hdds/interface-client/src/main/proto/hdds.proto
+++ b/hadoop-hdds/interface-client/src/main/proto/hdds.proto
@@ -431,3 +431,22 @@ message ReplicationManagerReportProto {
repeated KeyIntValue stat = 2;
repeated KeyContainerIDList statSample = 3;
}
+
+message ContainerBalancerConfigurationProto {
+ optional string utilizationThreshold = 5;
+ optional int32 datanodesInvolvedMaxPercentagePerIteration = 6;
+ optional int64 sizeMovedMaxPerIteration = 7;
+ optional int64 sizeEnteringTargetMax = 8;
+ optional int64 sizeLeavingSourceMax = 9;
+ optional int32 iterations = 10;
+ optional string excludeContainers = 11;
+ optional int64 moveTimeout = 12;
+ optional int64 balancingIterationInterval = 13;
+ optional string includeDatanodes = 14;
+ optional string excludeDatanodes = 15;
+ optional bool moveNetworkTopologyEnable = 16;
+ optional bool triggerDuBeforeMoveEnable = 17;
+
+ required bool shouldRun = 18;
+ optional int32 nextIterationIndex = 19;
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
index c2d96ffbd2..77d9b5cc61 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.hdds.fs.DUFactory;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ContainerBalancerConfigurationProto;
import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.ContainerID;
@@ -33,7 +34,7 @@ import org.apache.hadoop.hdds.scm.container.replication.LegacyReplicationManager
import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
-import org.apache.hadoop.hdds.scm.ha.SCMService;
+import org.apache.hadoop.hdds.scm.ha.StatefulService;
import org.apache.hadoop.hdds.scm.net.NetworkTopology;
import org.apache.hadoop.hdds.scm.node.DatanodeUsageInfo;
import org.apache.hadoop.hdds.scm.node.NodeManager;
@@ -43,6 +44,7 @@ import org.apache.hadoop.ozone.OzoneConsts;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -65,7 +67,7 @@ import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_NODE_REPORT_INTERVAL_DE
* Container balancer is a service in SCM to move containers between over- and
* under-utilized datanodes.
*/
-public class ContainerBalancer implements SCMService {
+public class ContainerBalancer extends StatefulService {
public static final Logger LOG =
LoggerFactory.getLogger(ContainerBalancer.class);
@@ -98,7 +100,6 @@ public class ContainerBalancer implements SCMService {
private NetworkTopology networkTopology;
private double upperLimit;
private double lowerLimit;
- private volatile boolean balancerRunning;
private volatile Thread currentBalancingThread;
private Lock lock;
private ContainerBalancerSelectionCriteria selectionCriteria;
@@ -110,15 +111,17 @@ public class ContainerBalancer implements SCMService {
CompletableFuture<LegacyReplicationManager.MoveResult>>
moveSelectionToFutureMap;
private IterationResult iterationResult;
+ private int nextIterationIndex;
/**
* Constructs ContainerBalancer with the specified arguments. Initializes
- * new ContainerBalancerConfiguration and ContainerBalancerMetrics.
- * Container Balancer does not start on construction.
+ * ContainerBalancerMetrics. Container Balancer does not start on
+ * construction.
*
* @param scm the storage container manager
*/
public ContainerBalancer(StorageContainerManager scm) {
+ super(scm.getStatefulServiceStateManager());
this.nodeManager = scm.getScmNodeManager();
this.containerManager = scm.getContainerManager();
this.replicationManager = scm.getReplicationManager();
@@ -134,13 +137,16 @@ public class ContainerBalancer implements SCMService {
this.unBalancedNodes = new ArrayList<>();
this.placementPolicy = scm.getContainerPlacementPolicy();
this.networkTopology = scm.getClusterMap();
+ this.nextIterationIndex = 0;
this.lock = new ReentrantLock();
findSourceStrategy = new FindSourceGreedy(nodeManager);
+ scm.getSCMServiceManager().register(this);
}
/**
- * Balances the cluster.
+ * Balances the cluster in iterations. Regularly checks if balancing has
+ * been stopped.
*/
private void balance() {
this.iterations = config.getIterations();
@@ -149,7 +155,11 @@ public class ContainerBalancer implements SCMService {
this.iterations = Integer.MAX_VALUE;
}
- for (int i = 0; i < iterations && balancerRunning; i++) {
+ // nextIterationIndex is the iteration that balancer should start from on
+ // leader change or restart
+ int i = nextIterationIndex;
+ resetState();
+ for (; i < iterations && isBalancerRunning(); i++) {
if (config.getTriggerDuEnable()) {
// before starting a new iteration, we trigger all the datanode
// to run `du`. this is an aggressive action, with which we can
@@ -179,19 +189,37 @@ public class ContainerBalancer implements SCMService {
}
}
- // stop balancing if iteration is not initialized
+ // initialize this iteration. stop balancing on initialization failure
if (!initializeIteration()) {
- stopBalancer();
+ // just return if the reason for initialization failure is that
+ // balancer has been stopped in another thread
+ if (!isBalancerRunning()) {
+ return;
+ }
+ // otherwise, try to stop balancer
+ tryStopBalancer("Could not initialize ContainerBalancer's " +
+ "iteration number " + i);
return;
}
- //if no new move option is generated, it means the cluster can
- //not be balanced any more , so just stop
IterationResult iR = doIteration();
metrics.incrementNumIterations(1);
LOG.info("Result of this iteration of Container Balancer: {}", iR);
+
+ // persist next iteration index
+ if (iR == IterationResult.ITERATION_COMPLETED) {
+ try {
+ saveConfiguration(config, true, i + 1);
+ } catch (IOException e) {
+ LOG.warn("Could not persist next iteration index value for " +
+ "ContainerBalancer after completing an iteration", e);
+ }
+ }
+
+ // if no new move option is generated, it means the cluster cannot be
+ // balanced anymore; so just stop balancer
if (iR == IterationResult.CAN_NOT_BALANCE_ANY_MORE) {
- stopBalancer();
+ tryStopBalancer(iR.toString());
return;
}
@@ -215,7 +243,11 @@ public class ContainerBalancer implements SCMService {
}
}
}
- stopBalancer();
+
+ // finally, stop balancer if it hasn't been stopped already
+ if (isBalancerRunning()) {
+ tryStopBalancer("Completed all iterations.");
+ }
}
/**
@@ -238,12 +270,11 @@ public class ContainerBalancer implements SCMService {
List<DatanodeUsageInfo> datanodeUsageInfos =
nodeManager.getMostOrLeastUsedDatanodes(true);
if (datanodeUsageInfos.isEmpty()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Container Balancer could not retrieve nodes from Node " +
- "Manager.");
- }
+ LOG.warn("Received an empty list of datanodes from Node Manager when " +
+ "trying to identify which nodes to balance");
return false;
}
+
this.threshold = config.getThresholdAsRatio();
this.maxDatanodesRatioToInvolvePerIteration =
config.getMaxDatanodesRatioToInvolvePerIteration();
@@ -801,24 +832,72 @@ public class ContainerBalancer implements SCMService {
/**
* Receives a notification for raft or safe mode related status changes.
* Stops ContainerBalancer if it's running and the current SCM becomes a
- * follower or enters safe mode.
+ * follower or enters safe mode. Starts ContainerBalancer if the current
+ * SCM becomes leader, is out of safe mode and balancer should run.
*/
@Override
public void notifyStatusChanged() {
- if (!scmContext.isLeader() || scmContext.isInSafeMode()) {
- if (isBalancerRunning()) {
- stopBalancingThread();
+ lock.lock();
+ try {
+ if (!scmContext.isLeader() || scmContext.isInSafeMode()) {
+ if (isBalancerRunning()) {
+ LOG.info("Stopping ContainerBalancer in this scm on status change");
+ stop();
+ }
+ } else {
+ if (shouldRun()) {
+ try {
+ LOG.info("Starting ContainerBalancer in this scm on status change");
+ start();
+ } catch (IllegalContainerBalancerStateException |
+ InvalidContainerBalancerConfigurationException e) {
+ LOG.warn("Could not start ContainerBalancer on raft/safe-mode " +
+ "status change.", e);
+ }
+ }
}
+ } finally {
+ lock.unlock();
}
}
/**
- * Checks if ContainerBalancer should run.
- * @return false
+ * Checks if ContainerBalancer should start (after a leader change, restart
+ * etc.) by reading persisted state.
+ * @return true if the persisted state is true, otherwise false
*/
@Override
public boolean shouldRun() {
- return false;
+ try {
+ ContainerBalancerConfigurationProto proto =
+ readConfiguration(ContainerBalancerConfigurationProto.class);
+ if (proto == null) {
+ LOG.warn("Could not find persisted configuration for {} when checking" +
+ " if ContainerBalancer should run. ContainerBalancer should not " +
+ "run now.", this.getServiceName());
+ return false;
+ }
+ return proto.getShouldRun();
+ } catch (IOException e) {
+ LOG.warn("Could not read persisted configuration for checking if " +
+ "ContainerBalancer should start. ContainerBalancer should not start" +
+ " now.", e);
+ return false;
+ }
+ }
+
+ /**
+ * Checks if ContainerBalancer is currently running in this SCM.
+ *
+ * @return true if the currentBalancingThread is not null, otherwise false
+ */
+ public boolean isBalancerRunning() {
+ lock.lock();
+ try {
+ return currentBalancingThread != null;
+ } finally {
+ lock.unlock();
+ }
}
/**
@@ -830,12 +909,47 @@ public class ContainerBalancer implements SCMService {
}
/**
- * Starts ContainerBalancer as an SCMService.
+ * Starts ContainerBalancer as an SCMService. Validates state, reads and
+ * validates persisted configuration, and then starts the balancing
+ * thread.
+ * @throws IllegalContainerBalancerStateException if balancer should not
+ * run according to persisted configuration
+ * @throws InvalidContainerBalancerConfigurationException if failed to
+ * retrieve persisted configuration, or the configuration is null
*/
@Override
- public void start() {
- if (shouldRun()) {
+ public void start() throws IllegalContainerBalancerStateException,
+ InvalidContainerBalancerConfigurationException {
+ lock.lock();
+ try {
+ // should be leader-ready, out of safe mode, and not running already
+ validateState(false);
+ ContainerBalancerConfigurationProto proto;
+ try {
+ proto = readConfiguration(ContainerBalancerConfigurationProto.class);
+ } catch (IOException e) {
+ throw new InvalidContainerBalancerConfigurationException("Could not " +
+ "retrieve persisted configuration while starting " +
+ "Container Balancer as an SCMService. Will not start now.", e);
+ }
+ if (proto == null) {
+ throw new InvalidContainerBalancerConfigurationException("Persisted " +
+ "configuration for ContainerBalancer is null during start. Will " +
+ "not start now.");
+ }
+ if (!proto.getShouldRun()) {
+ throw new IllegalContainerBalancerStateException("According to " +
+ "persisted configuration, ContainerBalancer should not run.");
+ }
+ ContainerBalancerConfiguration configuration =
+ ContainerBalancerConfiguration.fromProtobuf(proto,
+ ozoneConfiguration);
+ validateConfiguration(configuration);
+ this.config = configuration;
+ this.nextIterationIndex = proto.getNextIterationIndex();
startBalancingThread();
+ } finally {
+ lock.unlock();
}
}
@@ -848,13 +962,16 @@ public class ContainerBalancer implements SCMService {
* @throws InvalidContainerBalancerConfigurationException if
* {@link ContainerBalancerConfiguration} config file is incorrectly
* configured
+ * @throws IOException on failure to persist
+ * {@link ContainerBalancerConfiguration}
*/
- public void startBalancer() throws IllegalContainerBalancerStateException,
- InvalidContainerBalancerConfigurationException {
+ public void startBalancer(ContainerBalancerConfiguration configuration)
+ throws IllegalContainerBalancerStateException,
+ InvalidContainerBalancerConfigurationException, IOException {
lock.lock();
try {
- validateState();
- validateConfiguration(this.config);
+ // validates state, config, and then saves config
+ setBalancerConfigOnStartBalancer(configuration);
startBalancingThread();
} finally {
lock.unlock();
@@ -867,7 +984,6 @@ public class ContainerBalancer implements SCMService {
private void startBalancingThread() {
lock.lock();
try {
- balancerRunning = true;
currentBalancingThread = new Thread(this::balance);
currentBalancingThread.setName("ContainerBalancer");
currentBalancingThread.setDaemon(true);
@@ -879,11 +995,17 @@ public class ContainerBalancer implements SCMService {
}
/**
- * Checks if ContainerBalancer can start.
- * @throws IllegalContainerBalancerStateException if ContainerBalancer is
- * already running, SCM is in safe mode, or SCM is not leader ready
+ * Validates balancer's state based on the specified expectedRunning.
+ * Confirms SCM is leader-ready and out of safe mode.
+ *
+ * @param expectedRunning true if ContainerBalancer is expected to be
+ * running, else false
+ * @throws IllegalContainerBalancerStateException if SCM is not
+ * leader-ready, is in safe mode, or state does not match the specified
+ * expected state
*/
- private void validateState() throws IllegalContainerBalancerStateException {
+ private void validateState(boolean expectedRunning)
+ throws IllegalContainerBalancerStateException {
if (!scmContext.isLeaderReady()) {
LOG.warn("SCM is not leader ready");
throw new IllegalContainerBalancerStateException("SCM is not leader " +
@@ -895,10 +1017,10 @@ public class ContainerBalancer implements SCMService {
}
lock.lock();
try {
- if (isBalancerRunning() || currentBalancingThread != null) {
- LOG.warn("Cannot start ContainerBalancer because it's already running");
+ if (isBalancerRunning() != expectedRunning) {
throw new IllegalContainerBalancerStateException(
- "Cannot start ContainerBalancer because it's already running");
+ "Expect ContainerBalancer running state to be " + expectedRunning +
+ ", but running state is actually " + isBalancerRunning());
}
} finally {
lock.unlock();
@@ -910,20 +1032,49 @@ public class ContainerBalancer implements SCMService {
*/
@Override
public void stop() {
- stopBalancer();
+ lock.lock();
+ try {
+ if (!isBalancerRunning()) {
+ LOG.warn("Cannot stop Container Balancer because it's not running");
+ return;
+ }
+ stopBalancingThread();
+ } finally {
+ lock.unlock();
+ }
}
/**
* Stops ContainerBalancer gracefully.
*/
- public void stopBalancer() {
+ public void stopBalancer()
+ throws IOException, IllegalContainerBalancerStateException {
lock.lock();
try {
- if (!isBalancerRunning()) {
- LOG.info("Container Balancer is not running.");
- return;
- }
- stopBalancingThread();
+ // should be leader, out of safe mode, and currently running
+ validateState(true);
+ saveConfiguration(config, false, 0);
+ stop();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Tries to stop ContainerBalancer. Logs the reason for stopping. Calls
+ * {@link ContainerBalancer#stopBalancer()}.
+ * @param stopReason a string specifying the reason for stopping
+ * ContainerBalancer.
+ */
+ private void tryStopBalancer(String stopReason) {
+ lock.lock();
+ try {
+ LOG.info("Stopping ContainerBalancer. Reason for stopping: {}",
+ stopReason);
+ stopBalancer();
+ } catch (IllegalContainerBalancerStateException | IOException e) {
+ LOG.warn("Tried to stop ContainerBalancer but failed. Reason for " +
+ "stopping: {}", stopReason, e);
} finally {
lock.unlock();
}
@@ -933,7 +1084,6 @@ public class ContainerBalancer implements SCMService {
Thread balancingThread;
lock.lock();
try {
- balancerRunning = false;
balancingThread = currentBalancingThread;
currentBalancingThread = null;
} finally {
@@ -952,6 +1102,20 @@ public class ContainerBalancer implements SCMService {
LOG.info("Container Balancer stopped successfully.");
}
+ private void saveConfiguration(ContainerBalancerConfiguration configuration,
+ boolean shouldRun, int index)
+ throws IOException {
+ lock.lock();
+ try {
+ saveConfiguration(configuration.toProtobufBuilder()
+ .setShouldRun(shouldRun)
+ .setNextIterationIndex(index)
+ .build());
+ } finally {
+ lock.unlock();
+ }
+ }
+
private void validateConfiguration(ContainerBalancerConfiguration conf)
throws InvalidContainerBalancerConfigurationException {
// maxSizeEnteringTarget and maxSizeLeavingSource should by default be
@@ -1007,12 +1171,24 @@ public class ContainerBalancer implements SCMService {
}
/**
- * Sets the configuration that ContainerBalancer will use. This should be
- * set before starting balancer.
- * @param config ContainerBalancerConfiguration
+ * Persists the configuration that ContainerBalancer will use after
+ * validating state and the specified configuration.
+ * @param configuration ContainerBalancerConfiguration to persist
+ * @throws InvalidContainerBalancerConfigurationException on failure to
+ * validate the specified configuration
+ * @throws IllegalContainerBalancerStateException if this SCM is not leader
+ * or not out of safe mode or if ContainerBalancer is currently running in
+ * this SCM
+ * @throws IOException on failure to persist configuration
*/
- public void setConfig(ContainerBalancerConfiguration config) {
- this.config = config;
+ private void setBalancerConfigOnStartBalancer(
+ ContainerBalancerConfiguration configuration)
+ throws InvalidContainerBalancerConfigurationException,
+ IllegalContainerBalancerStateException, IOException {
+ validateState(false);
+ validateConfiguration(configuration);
+ saveConfiguration(configuration, true, 0);
+ this.config = configuration;
}
/**
@@ -1060,15 +1236,6 @@ public class ContainerBalancer implements SCMService {
return sourceToTargetMap;
}
- /**
- * Checks if ContainerBalancer is currently running.
- *
- * @return true if ContainerBalancer is running, false if not running.
- */
- public boolean isBalancerRunning() {
- return balancerRunning;
- }
-
@VisibleForTesting
int getCountDatanodesInvolvedPerIteration() {
return countDatanodesInvolvedPerIteration;
@@ -1096,7 +1263,7 @@ public class ContainerBalancer implements SCMService {
public String toString() {
String status = String.format("%nContainer Balancer status:%n" +
"%-30s %s%n" +
- "%-30s %b%n", "Key", "Value", "Running", balancerRunning);
+ "%-30s %b%n", "Key", "Value", "Running", isBalancerRunning());
return status + config.toString();
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerConfiguration.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerConfiguration.java
index 4e994c8a38..9d5083768c 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerConfiguration.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerConfiguration.java
@@ -22,8 +22,11 @@ import org.apache.hadoop.hdds.conf.Config;
import org.apache.hadoop.hdds.conf.ConfigGroup;
import org.apache.hadoop.hdds.conf.ConfigTag;
import org.apache.hadoop.hdds.conf.ConfigType;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ContainerBalancerConfigurationProto;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.ozone.OzoneConsts;
+import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -209,6 +212,10 @@ public final class ContainerBalancerConfiguration {
return triggerDuEnable;
}
+ public void setTriggerDuEnable(boolean enable) {
+ triggerDuEnable = enable;
+ }
+
/**
* Set the NetworkTopologyEnable value for Container Balancer.
*
@@ -315,6 +322,10 @@ public final class ContainerBalancerConfiguration {
this.moveTimeout = duration.toMillis();
}
+ public void setMoveTimeout(long millis) {
+ this.moveTimeout = millis;
+ }
+
public Duration getBalancingInterval() {
return Duration.ofMillis(balancingInterval);
}
@@ -323,6 +334,10 @@ public final class ContainerBalancerConfiguration {
this.balancingInterval = balancingInterval.toMillis();
}
+ public void setBalancingInterval(long millis) {
+ this.balancingInterval = millis;
+ }
+
/**
* Gets a set of datanode hostnames or ip addresses that will be the exclusive
* participants in balancing.
@@ -390,4 +405,73 @@ public final class ContainerBalancerConfiguration {
"Max Size Leaving Source per Iteration",
maxSizeLeavingSource / OzoneConsts.GB);
}
+
+ ContainerBalancerConfigurationProto.Builder toProtobufBuilder() {
+ ContainerBalancerConfigurationProto.Builder builder =
+ ContainerBalancerConfigurationProto.newBuilder();
+
+ builder.setUtilizationThreshold(threshold)
+ .setDatanodesInvolvedMaxPercentagePerIteration(
+ maxDatanodesPercentageToInvolvePerIteration)
+ .setSizeMovedMaxPerIteration(maxSizeToMovePerIteration)
+ .setSizeEnteringTargetMax(maxSizeEnteringTarget)
+ .setSizeLeavingSourceMax(maxSizeLeavingSource)
+ .setIterations(iterations)
+ .setExcludeContainers(excludeContainers)
+ .setMoveTimeout(moveTimeout)
+ .setBalancingIterationInterval(balancingInterval)
+ .setIncludeDatanodes(includeNodes)
+ .setExcludeDatanodes(excludeNodes)
+ .setMoveNetworkTopologyEnable(networkTopologyEnable)
+ .setTriggerDuBeforeMoveEnable(triggerDuEnable);
+ return builder;
+ }
+
+ static ContainerBalancerConfiguration fromProtobuf(
+ @NotNull ContainerBalancerConfigurationProto proto,
+ @NotNull OzoneConfiguration ozoneConfiguration) {
+ ContainerBalancerConfiguration config =
+ ozoneConfiguration.getObject(ContainerBalancerConfiguration.class);
+ if (proto.hasUtilizationThreshold()) {
+ config.setThreshold(Double.parseDouble(proto.getUtilizationThreshold()));
+ }
+ if (proto.hasDatanodesInvolvedMaxPercentagePerIteration()) {
+ config.setMaxDatanodesPercentageToInvolvePerIteration(
+ proto.getDatanodesInvolvedMaxPercentagePerIteration());
+ }
+ if (proto.hasSizeMovedMaxPerIteration()) {
+ config.setMaxSizeToMovePerIteration(proto.getSizeMovedMaxPerIteration());
+ }
+ if (proto.hasSizeEnteringTargetMax()) {
+ config.setMaxSizeEnteringTarget(proto.getSizeEnteringTargetMax());
+ }
+ if (proto.hasSizeLeavingSourceMax()) {
+ config.setMaxSizeLeavingSource(proto.getSizeLeavingSourceMax());
+ }
+ if (proto.hasIterations()) {
+ config.setIterations(proto.getIterations());
+ }
+ if (proto.hasExcludeContainers()) {
+ config.setExcludeContainers(proto.getExcludeContainers());
+ }
+ if (proto.hasMoveTimeout()) {
+ config.setMoveTimeout(proto.getMoveTimeout());
+ }
+ if (proto.hasBalancingIterationInterval()) {
+ config.setBalancingInterval(proto.getBalancingIterationInterval());
+ }
+ if (proto.hasIncludeDatanodes()) {
+ config.setIncludeNodes(proto.getIncludeDatanodes());
+ }
+ if (proto.hasExcludeDatanodes()) {
+ config.setExcludeNodes(proto.getExcludeDatanodes());
+ }
+ if (proto.hasMoveNetworkTopologyEnable()) {
+ config.setNetworkTopologyEnable(proto.getMoveNetworkTopologyEnable());
+ }
+ if (proto.hasTriggerDuBeforeMoveEnable()) {
+ config.setTriggerDuEnable(proto.getTriggerDuBeforeMoveEnable());
+ }
+ return config;
+ }
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/IllegalContainerBalancerStateException.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/IllegalContainerBalancerStateException.java
index cc938e636f..b061ddf7c9 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/IllegalContainerBalancerStateException.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/IllegalContainerBalancerStateException.java
@@ -18,10 +18,13 @@
package org.apache.hadoop.hdds.scm.container.balancer;
+import org.apache.hadoop.hdds.scm.ha.SCMServiceException;
+
/**
* Signals that a state change cannot be performed on ContainerBalancer.
*/
-public class IllegalContainerBalancerStateException extends Exception {
+public class IllegalContainerBalancerStateException extends
+ SCMServiceException {
/**
* Constructs an IllegalContainerBalancerStateException with no detail
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/InvalidContainerBalancerConfigurationException.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/InvalidContainerBalancerConfigurationException.java
index c6a6bf030b..9a4cc86ca4 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/InvalidContainerBalancerConfigurationException.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/InvalidContainerBalancerConfigurationException.java
@@ -18,11 +18,16 @@
package org.apache.hadoop.hdds.scm.container.balancer;
+import org.apache.hadoop.hdds.scm.ha.SCMServiceException;
+
+import java.io.IOException;
+
/**
* Signals that {@link ContainerBalancerConfiguration} contains invalid
* configuration value(s).
*/
-public class InvalidContainerBalancerConfigurationException extends Exception {
+public class InvalidContainerBalancerConfigurationException extends
+ SCMServiceException {
/**
* Constructs an InvalidContainerBalancerConfigurationException with no detail
@@ -44,4 +49,8 @@ public class InvalidContainerBalancerConfigurationException extends Exception {
super(s);
}
+ public InvalidContainerBalancerConfigurationException(String s,
+ IOException e) {
+ super(s, e);
+ }
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMService.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMService.java
index 4d7c435b1e..2b185c9e4e 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMService.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMService.java
@@ -66,7 +66,7 @@ public interface SCMService {
/**
* starts the SCM service.
*/
- void start();
+ void start() throws SCMServiceException;
/**
* stops the SCM service.
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMServiceException.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMServiceException.java
new file mode 100644
index 0000000000..72fb7d25d2
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMServiceException.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.hadoop.hdds.scm.ha;
+
+/**
+ * Checked exceptions thrown by an {@link SCMService}.
+ */
+public class SCMServiceException extends Exception {
+
+ /**
+ * Constructs a new exception with {@code null} as its detail message.
+ */
+ public SCMServiceException() {
+ super();
+ }
+
+ public SCMServiceException(String s) {
+ super(s);
+ }
+
+ public SCMServiceException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public SCMServiceException(Throwable cause) {
+ super(cause);
+ }
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMServiceManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMServiceManager.java
index 1b75c4feed..2ab8f8ea24 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMServiceManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMServiceManager.java
@@ -70,7 +70,11 @@ public final class SCMServiceManager {
public synchronized void start() {
for (SCMService service : services) {
LOG.debug("Stopping service:{}.", service.getServiceName());
- service.start();
+ try {
+ service.start();
+ } catch (SCMServiceException e) {
+ LOG.warn("Could not start " + service.getServiceName(), e);
+ }
}
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/StatefulService.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/StatefulService.java
index 441e83ce13..69df7c740f 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/StatefulService.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/StatefulService.java
@@ -20,8 +20,6 @@ package org.apache.hadoop.hdds.scm.ha;
import com.google.protobuf.ByteString;
import com.google.protobuf.GeneratedMessage;
-import com.google.protobuf.InvalidProtocolBufferException;
-import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
@@ -35,10 +33,11 @@ public abstract class StatefulService implements SCMService {
/**
* Initialize a StatefulService from an extending class.
- * @param scm {@link StorageContainerManager}
+ * @param stateManager a reference to the
+ * {@link StatefulServiceStateManager} from SCM.
*/
- protected StatefulService(StorageContainerManager scm) {
- stateManager = scm.getStatefulServiceStateManager();
+ protected StatefulService(StatefulServiceStateManager stateManager) {
+ this.stateManager = stateManager;
}
/**
@@ -60,21 +59,27 @@ public abstract class StatefulService implements SCMService {
*
* @param configType the Class object of the protobuf message type
* @param <T> the Type of the protobuf message
- * @return persisted protobuf message
+ * @return persisted protobuf message or null if the entry is not found
* @throws IOException on failure to fetch the message from DB or when
* parsing it. ensure the specified configType is correct
*/
protected final <T extends GeneratedMessage> T readConfiguration(
Class<T> configType) throws IOException {
+ ByteString byteString = stateManager.readConfiguration(getServiceName());
+ if (byteString == null) {
+ return null;
+ }
try {
return configType.cast(ReflectionUtil.getMethod(configType,
"parseFrom", ByteString.class)
- .invoke(null, stateManager.readConfiguration(getServiceName())));
+ .invoke(null, byteString));
} catch (NoSuchMethodException | IllegalAccessException
| InvocationTargetException e) {
e.printStackTrace();
- throw new InvalidProtocolBufferException("GeneratedMessage cannot " +
- "be parsed for type " + configType + ": " + e.getMessage());
+ throw new IOException("GeneratedMessage cannot be parsed. Ensure that "
+ + configType + " is the correct expected message type for " +
+ this.getServiceName(), e);
}
+
}
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/StatefulServiceStateManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/StatefulServiceStateManagerImpl.java
index d470f1bf7f..1e7a756f41 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/StatefulServiceStateManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/StatefulServiceStateManagerImpl.java
@@ -23,6 +23,8 @@ import com.google.protobuf.ByteString;
import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType;
import org.apache.hadoop.hdds.scm.metadata.DBTransactionBuffer;
import org.apache.hadoop.hdds.utils.db.Table;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.lang.reflect.Proxy;
@@ -34,6 +36,9 @@ import java.lang.reflect.Proxy;
public final class StatefulServiceStateManagerImpl
implements StatefulServiceStateManager {
+ public static final Logger LOG =
+ LoggerFactory.getLogger(StatefulServiceStateManagerImpl.class);
+
// this table maps the service name to the configuration (ByteString)
private Table<String, ByteString> statefulServiceConfig;
private final DBTransactionBuffer transactionBuffer;
@@ -52,10 +57,19 @@ public final class StatefulServiceStateManagerImpl
public void saveConfiguration(String serviceName, ByteString bytes)
throws IOException {
transactionBuffer.addToBuffer(statefulServiceConfig, serviceName, bytes);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Added specified bytes to the transaction buffer for key " +
+ "{} to table {}", serviceName, statefulServiceConfig.getName());
+ }
+
if (transactionBuffer instanceof SCMHADBTransactionBuffer) {
SCMHADBTransactionBuffer buffer =
(SCMHADBTransactionBuffer) transactionBuffer;
buffer.flush();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Transaction buffer flushed");
+ }
}
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/IllegalContainerBalancerStateException.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/io/ByteStringCodec.java
similarity index 52%
copy from hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/IllegalContainerBalancerStateException.java
copy to hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/io/ByteStringCodec.java
index cc938e636f..e0cf00c52a 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/IllegalContainerBalancerStateException.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/io/ByteStringCodec.java
@@ -15,32 +15,25 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.hadoop.hdds.scm.ha.io;
-package org.apache.hadoop.hdds.scm.container.balancer;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
/**
- * Signals that a state change cannot be performed on ContainerBalancer.
+ * A dummy codec that serializes a ByteString object to ByteString.
*/
-public class IllegalContainerBalancerStateException extends Exception {
+public class ByteStringCodec implements Codec {
- /**
- * Constructs an IllegalContainerBalancerStateException with no detail
- * message. A detail message is a String that describes this particular
- * exception.
- */
- public IllegalContainerBalancerStateException() {
- super();
+ @Override
+ public ByteString serialize(Object object)
+ throws InvalidProtocolBufferException {
+ return (ByteString) object;
}
- /**
- * Constructs an IllegalContainerBalancerStateException with the specified
- * detail message. A detail message is a String that describes this particular
- * exception.
- *
- * @param s the String that contains a detailed message
- */
- public IllegalContainerBalancerStateException(String s) {
- super(s);
+ @Override
+ public Object deserialize(Class<?> type, ByteString value)
+ throws InvalidProtocolBufferException {
+ return value;
}
-
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/io/CodecFactory.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/io/CodecFactory.java
index 9fb771b7a7..dae2b3ce6a 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/io/CodecFactory.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/io/CodecFactory.java
@@ -17,6 +17,7 @@
package org.apache.hadoop.hdds.scm.ha.io;
+import com.google.protobuf.ByteString;
import com.google.protobuf.GeneratedMessage;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.ProtocolMessageEnum;
@@ -45,6 +46,7 @@ public final class CodecFactory {
codecs.put(Boolean.class, new BooleanCodec());
codecs.put(BigInteger.class, new BigIntegerCodec());
codecs.put(X509Certificate.class, new X509CertificateCodec());
+ codecs.put(ByteString.class, new ByteStringCodec());
}
private CodecFactory() { }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
index 2d38331ceb..fcee862029 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
@@ -909,10 +909,9 @@ public class SCMClientProtocolServer implements
}
ContainerBalancer containerBalancer = scm.getContainerBalancer();
- containerBalancer.setConfig(cbc);
try {
- containerBalancer.startBalancer();
- } catch (IllegalContainerBalancerStateException |
+ containerBalancer.startBalancer(cbc);
+ } catch (IllegalContainerBalancerStateException | IOException |
InvalidContainerBalancerConfigurationException e) {
AUDIT.logWriteFailure(buildAuditMessageForFailure(
SCMAction.START_CONTAINER_BALANCER, null, e));
@@ -931,9 +930,14 @@ public class SCMClientProtocolServer implements
@Override
public void stopContainerBalancer() throws IOException {
getScm().checkAdminAccess(getRemoteUser());
- AUDIT.logWriteSuccess(buildAuditMessageForSuccess(
- SCMAction.STOP_CONTAINER_BALANCER, null));
- scm.getContainerBalancer().stopBalancer();
+ try {
+ scm.getContainerBalancer().stopBalancer();
+ AUDIT.logWriteSuccess(buildAuditMessageForSuccess(
+ SCMAction.STOP_CONTAINER_BALANCER, null));
+ } catch (IllegalContainerBalancerStateException e) {
+ AUDIT.logWriteFailure(buildAuditMessageForFailure(
+ SCMAction.STOP_CONTAINER_BALANCER, null, e));
+ }
}
@Override
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 9e651c8dfd..4440d4a67d 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -1469,10 +1469,15 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
@Override
public void stop() {
try {
- LOG.info("Stopping Container Balancer service.");
- containerBalancer.stopBalancer();
+ if (containerBalancer.isBalancerRunning()) {
+ LOG.info("Stopping Container Balancer service.");
+ // stop ContainerBalancer thread in this scm
+ containerBalancer.stop();
+ } else {
+ LOG.info("Container Balancer is not running.");
+ }
} catch (Exception e) {
- LOG.error("Failed to stop Container Balancer service.");
+ LOG.error("Failed to stop Container Balancer service.", e);
}
try {
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancer.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancer.java
index 4901617f9f..98c92b2242 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancer.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancer.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdds.scm.container.balancer;
+import com.google.protobuf.ByteString;
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -28,7 +29,6 @@ import org.apache.hadoop.hdds.scm.ContainerPlacementStatus;
import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
-import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
import org.apache.hadoop.hdds.scm.container.MockNodeManager;
@@ -38,8 +38,11 @@ import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicyFactory;
import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementMetrics;
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
-import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.scm.ha.SCMService;
+import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
+import org.apache.hadoop.hdds.scm.ha.StatefulServiceStateManager;
+import org.apache.hadoop.hdds.scm.ha.StatefulServiceStateManagerImpl;
import org.apache.hadoop.hdds.scm.node.DatanodeUsageInfo;
import org.apache.hadoop.hdds.scm.node.NodeStatus;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
@@ -49,14 +52,13 @@ import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.ozone.test.GenericTestUtils;
import org.junit.Assert;
import org.junit.Before;
-import org.junit.Rule;
import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;
+import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
@@ -96,21 +98,24 @@ public class TestContainerBalancer {
private Map<ContainerID, ContainerInfo> cidToInfoMap = new HashMap<>();
private Map<DatanodeUsageInfo, Set<ContainerID>> datanodeToContainersMap =
new HashMap<>();
+ private Map<String, ByteString> serviceToConfigMap = new HashMap<>();
private static final ThreadLocalRandom RANDOM = ThreadLocalRandom.current();
- @Rule
- public TemporaryFolder tempFolder = new TemporaryFolder();
+ private StatefulServiceStateManager serviceStateManager;
/**
* Sets up configuration values and creates a mock cluster.
*/
@Before
- public void setup() throws SCMException, NodeNotFoundException {
+ public void setup() throws IOException, NodeNotFoundException {
conf = new OzoneConfiguration();
scm = Mockito.mock(StorageContainerManager.class);
containerManager = Mockito.mock(ContainerManager.class);
replicationManager = Mockito.mock(ReplicationManager.class);
+ serviceStateManager = Mockito.mock(StatefulServiceStateManagerImpl.class);
+ SCMServiceManager scmServiceManager = Mockito.mock(SCMServiceManager.class);
+ // these configs will usually be specified in each test
balancerConfiguration =
conf.getObject(ContainerBalancerConfiguration.class);
balancerConfiguration.setThreshold(10);
@@ -161,7 +166,30 @@ public class TestContainerBalancer {
when(scm.getClusterMap()).thenReturn(null);
when(scm.getEventQueue()).thenReturn(mock(EventPublisher.class));
when(scm.getConfiguration()).thenReturn(conf);
+ when(scm.getStatefulServiceStateManager()).thenReturn(serviceStateManager);
+ when(scm.getSCMServiceManager()).thenReturn(scmServiceManager);
+ /*
+ When StatefulServiceStateManager#saveConfiguration is called, save to
+ in-memory serviceToConfigMap instead.
+ */
+ Mockito.doAnswer(i -> {
+ serviceToConfigMap.put(i.getArgument(0, String.class), i.getArgument(1,
+ ByteString.class));
+ return null;
+ }).when(serviceStateManager).saveConfiguration(
+ Mockito.any(String.class),
+ Mockito.any(ByteString.class));
+
+ /*
+ When StatefulServiceStateManager#readConfiguration is called, read from
+ serviceToConfigMap instead.
+ */
+ when(serviceStateManager.readConfiguration(Mockito.anyString())).thenAnswer(
+ i -> serviceToConfigMap.get(i.getArgument(0, String.class)));
+
+ Mockito.doNothing().when(scmServiceManager)
+ .register(Mockito.any(SCMService.class));
containerBalancer = new ContainerBalancer(scm);
}
@@ -184,7 +212,9 @@ public class TestContainerBalancer {
*/
@Test
public void
- initializeIterationShouldUpdateUnBalancedNodesWhenThresholdChanges() {
+ initializeIterationShouldUpdateUnBalancedNodesWhenThresholdChanges()
+ throws IllegalContainerBalancerStateException, IOException,
+ InvalidContainerBalancerConfigurationException {
List<DatanodeUsageInfo> expectedUnBalancedNodes;
List<DatanodeUsageInfo> unBalancedNodesAccordingToBalancer;
@@ -207,7 +237,7 @@ public class TestContainerBalancer {
unBalancedNodesAccordingToBalancer =
containerBalancer.getUnBalancedNodes();
- containerBalancer.stopBalancer();
+ stopBalancer();
Assert.assertEquals(
expectedUnBalancedNodes.size(),
unBalancedNodesAccordingToBalancer.size());
@@ -224,13 +254,15 @@ public class TestContainerBalancer {
* balanced.
*/
@Test
- public void unBalancedNodesListShouldBeEmptyWhenClusterIsBalanced() {
+ public void unBalancedNodesListShouldBeEmptyWhenClusterIsBalanced()
+ throws IllegalContainerBalancerStateException, IOException,
+ InvalidContainerBalancerConfigurationException {
balancerConfiguration.setThreshold(99.99);
startBalancer(balancerConfiguration);
sleepWhileBalancing(100);
- containerBalancer.stopBalancer();
+ stopBalancer();
ContainerBalancerMetrics metrics = containerBalancer.getMetrics();
Assert.assertEquals(0, containerBalancer.getUnBalancedNodes().size());
Assert.assertEquals(0, metrics.getNumDatanodesUnbalanced());
@@ -241,7 +273,9 @@ public class TestContainerBalancer {
* maxDatanodesRatioToInvolvePerIteration limit.
*/
@Test
- public void containerBalancerShouldObeyMaxDatanodesToInvolveLimit() {
+ public void containerBalancerShouldObeyMaxDatanodesToInvolveLimit()
+ throws IllegalContainerBalancerStateException, IOException,
+ InvalidContainerBalancerConfigurationException {
int percent = 20;
balancerConfiguration.setMaxDatanodesPercentageToInvolvePerIteration(
percent);
@@ -259,11 +293,13 @@ public class TestContainerBalancer {
Assert.assertTrue(metrics.getNumDatanodesInvolvedInLatestIteration() > 0);
Assert.assertFalse(
metrics.getNumDatanodesInvolvedInLatestIteration() > number);
- containerBalancer.stopBalancer();
+ stopBalancer();
}
@Test
- public void containerBalancerShouldSelectOnlyClosedContainers() {
+ public void containerBalancerShouldSelectOnlyClosedContainers()
+ throws IllegalContainerBalancerStateException, IOException,
+ InvalidContainerBalancerConfigurationException {
// make all containers open, balancer should not select any of them
for (ContainerInfo containerInfo : cidToInfoMap.values()) {
containerInfo.setState(HddsProtos.LifeCycleState.OPEN);
@@ -271,7 +307,7 @@ public class TestContainerBalancer {
balancerConfiguration.setThreshold(10);
startBalancer(balancerConfiguration);
sleepWhileBalancing(500);
- containerBalancer.stopBalancer();
+ stopBalancer();
// balancer should have identified unbalanced nodes
Assert.assertFalse(containerBalancer.getUnBalancedNodes().isEmpty());
@@ -291,7 +327,7 @@ public class TestContainerBalancer {
}
startBalancer(balancerConfiguration);
sleepWhileBalancing(500);
- containerBalancer.stopBalancer();
+ stopBalancer();
// check whether all selected containers are closed
for (ContainerMoveSelection moveSelection:
@@ -303,7 +339,9 @@ public class TestContainerBalancer {
}
@Test
- public void containerBalancerShouldObeyMaxSizeToMoveLimit() {
+ public void containerBalancerShouldObeyMaxSizeToMoveLimit()
+ throws IllegalContainerBalancerStateException, IOException,
+ InvalidContainerBalancerConfigurationException {
balancerConfiguration.setThreshold(1);
balancerConfiguration.setMaxSizeToMovePerIteration(10 * OzoneConsts.GB);
balancerConfiguration.setIterations(1);
@@ -319,11 +357,13 @@ public class TestContainerBalancer {
.getDataSizeMovedGBInLatestIteration();
Assert.assertTrue(size > 0);
Assert.assertFalse(size > 10);
- containerBalancer.stopBalancer();
+ stopBalancer();
}
@Test
- public void targetDatanodeShouldNotAlreadyContainSelectedContainer() {
+ public void targetDatanodeShouldNotAlreadyContainSelectedContainer()
+ throws IllegalContainerBalancerStateException, IOException,
+ InvalidContainerBalancerConfigurationException {
balancerConfiguration.setThreshold(10);
balancerConfiguration.setMaxSizeToMovePerIteration(100 * OzoneConsts.GB);
balancerConfiguration.setMaxDatanodesPercentageToInvolvePerIteration(100);
@@ -336,7 +376,7 @@ public class TestContainerBalancer {
Thread.sleep(1000);
} catch (InterruptedException e) { }
- containerBalancer.stopBalancer();
+ stopBalancer();
Map<DatanodeDetails, ContainerMoveSelection> sourceToTargetMap =
containerBalancer.getSourceToTargetMap();
for (ContainerMoveSelection moveSelection : sourceToTargetMap.values()) {
@@ -350,7 +390,9 @@ public class TestContainerBalancer {
}
@Test
- public void containerMoveSelectionShouldFollowPlacementPolicy() {
+ public void containerMoveSelectionShouldFollowPlacementPolicy()
+ throws IllegalContainerBalancerStateException, IOException,
+ InvalidContainerBalancerConfigurationException {
balancerConfiguration.setThreshold(10);
balancerConfiguration.setMaxSizeToMovePerIteration(50 * OzoneConsts.GB);
balancerConfiguration.setMaxDatanodesPercentageToInvolvePerIteration(100);
@@ -363,7 +405,7 @@ public class TestContainerBalancer {
Thread.sleep(1000);
} catch (InterruptedException e) { }
- containerBalancer.stopBalancer();
+ stopBalancer();
Map<DatanodeDetails, ContainerMoveSelection> sourceToTargetMap =
containerBalancer.getSourceToTargetMap();
@@ -392,7 +434,8 @@ public class TestContainerBalancer {
@Test
public void targetDatanodeShouldBeInServiceHealthy()
- throws NodeNotFoundException {
+ throws NodeNotFoundException, IllegalContainerBalancerStateException,
+ IOException, InvalidContainerBalancerConfigurationException {
balancerConfiguration.setThreshold(10);
balancerConfiguration.setMaxDatanodesPercentageToInvolvePerIteration(100);
balancerConfiguration.setMaxSizeToMovePerIteration(50 * OzoneConsts.GB);
@@ -407,7 +450,7 @@ public class TestContainerBalancer {
} catch (InterruptedException e) {
}
- containerBalancer.stopBalancer();
+ stopBalancer();
for (ContainerMoveSelection moveSelection :
containerBalancer.getSourceToTargetMap().values()) {
DatanodeDetails target = moveSelection.getTargetNode();
@@ -419,7 +462,9 @@ public class TestContainerBalancer {
}
@Test
- public void selectedContainerShouldNotAlreadyHaveBeenSelected() {
+ public void selectedContainerShouldNotAlreadyHaveBeenSelected()
+ throws IllegalContainerBalancerStateException, IOException,
+ InvalidContainerBalancerConfigurationException {
balancerConfiguration.setThreshold(10);
balancerConfiguration.setMaxDatanodesPercentageToInvolvePerIteration(100);
balancerConfiguration.setMaxSizeToMovePerIteration(50 * OzoneConsts.GB);
@@ -434,7 +479,7 @@ public class TestContainerBalancer {
Thread.sleep(1000);
} catch (InterruptedException e) { }
- containerBalancer.stopBalancer();
+ stopBalancer();
Set<ContainerID> containers = new HashSet<>();
for (ContainerMoveSelection moveSelection :
containerBalancer.getSourceToTargetMap().values()) {
@@ -445,7 +490,9 @@ public class TestContainerBalancer {
}
@Test
- public void balancerShouldNotSelectConfiguredExcludeContainers() {
+ public void balancerShouldNotSelectConfiguredExcludeContainers()
+ throws IllegalContainerBalancerStateException, IOException,
+ InvalidContainerBalancerConfigurationException {
balancerConfiguration.setThreshold(10);
balancerConfiguration.setMaxDatanodesPercentageToInvolvePerIteration(100);
balancerConfiguration.setMaxSizeToMovePerIteration(50 * OzoneConsts.GB);
@@ -461,7 +508,7 @@ public class TestContainerBalancer {
Thread.sleep(1000);
} catch (InterruptedException e) { }
- containerBalancer.stopBalancer();
+ stopBalancer();
Set<ContainerID> excludeContainers =
balancerConfiguration.getExcludeContainers();
for (ContainerMoveSelection moveSelection :
@@ -472,7 +519,9 @@ public class TestContainerBalancer {
}
@Test
- public void balancerShouldObeyMaxSizeEnteringTargetLimit() {
+ public void balancerShouldObeyMaxSizeEnteringTargetLimit()
+ throws IllegalContainerBalancerStateException, IOException,
+ InvalidContainerBalancerConfigurationException {
conf.set("ozone.scm.container.size", "1MB");
balancerConfiguration =
conf.getObject(ContainerBalancerConfiguration.class);
@@ -487,7 +536,7 @@ public class TestContainerBalancer {
Assert.assertFalse(containerBalancer.getUnBalancedNodes().isEmpty());
Assert.assertTrue(containerBalancer.getSourceToTargetMap().isEmpty());
- containerBalancer.stopBalancer();
+ stopBalancer();
// some containers should be selected when using default values
OzoneConfiguration ozoneConfiguration = new OzoneConfiguration();
@@ -497,26 +546,28 @@ public class TestContainerBalancer {
sleepWhileBalancing(500);
- containerBalancer.stopBalancer();
+ stopBalancer();
// balancer should have identified unbalanced nodes
Assert.assertFalse(containerBalancer.getUnBalancedNodes().isEmpty());
Assert.assertFalse(containerBalancer.getSourceToTargetMap().isEmpty());
}
@Test
- public void testMetrics() {
+ public void testMetrics()
+ throws IllegalContainerBalancerStateException, IOException,
+ InvalidContainerBalancerConfigurationException {
conf.set("hdds.datanode.du.refresh.period", "1ms");
balancerConfiguration.setBalancingInterval(Duration.ofMillis(2));
balancerConfiguration.setThreshold(10);
balancerConfiguration.setIterations(1);
balancerConfiguration.setMaxSizeEnteringTarget(6 * OzoneConsts.GB);
- // deliberately set max size per iteration to a low value, 6GB
+ // deliberately set max size per iteration to a low value, 6 GB
balancerConfiguration.setMaxSizeToMovePerIteration(6 * OzoneConsts.GB);
balancerConfiguration.setMaxDatanodesPercentageToInvolvePerIteration(100);
startBalancer(balancerConfiguration);
sleepWhileBalancing(500);
- containerBalancer.stopBalancer();
+ stopBalancer();
ContainerBalancerMetrics metrics = containerBalancer.getMetrics();
Assert.assertEquals(determineExpectedUnBalancedNodes(
@@ -535,7 +586,9 @@ public class TestContainerBalancer {
* exclude configurations, then it should be excluded.
*/
@Test
- public void balancerShouldFollowExcludeAndIncludeDatanodesConfigurations() {
+ public void balancerShouldFollowExcludeAndIncludeDatanodesConfigurations()
+ throws IllegalContainerBalancerStateException, IOException,
+ InvalidContainerBalancerConfigurationException {
balancerConfiguration.setThreshold(10);
balancerConfiguration.setIterations(1);
balancerConfiguration.setMaxSizeEnteringTarget(10 * OzoneConsts.GB);
@@ -569,7 +622,7 @@ public class TestContainerBalancer {
balancerConfiguration.setIncludeNodes(includeNodes);
startBalancer(balancerConfiguration);
sleepWhileBalancing(500);
- containerBalancer.stopBalancer();
+ stopBalancer();
// finally, these should be the only nodes included in balancing
// (included - excluded)
@@ -606,7 +659,9 @@ public class TestContainerBalancer {
@Test
public void checkIterationResult()
- throws NodeNotFoundException, ContainerNotFoundException {
+ throws NodeNotFoundException, IOException,
+ IllegalContainerBalancerStateException,
+ InvalidContainerBalancerConfigurationException {
balancerConfiguration.setThreshold(10);
balancerConfiguration.setIterations(1);
balancerConfiguration.setMaxSizeEnteringTarget(10 * OzoneConsts.GB);
@@ -622,7 +677,7 @@ public class TestContainerBalancer {
*/
Assert.assertEquals(ContainerBalancer.IterationResult.ITERATION_COMPLETED,
containerBalancer.getIterationResult());
- containerBalancer.stop();
+ stopBalancer();
/*
Now, limit maxSizeToMovePerIteration but fail all container moves. The
@@ -640,12 +695,14 @@ public class TestContainerBalancer {
Assert.assertEquals(ContainerBalancer.IterationResult.ITERATION_COMPLETED,
containerBalancer.getIterationResult());
- containerBalancer.stop();
+ stopBalancer();
}
@Test
public void checkIterationResultTimeout()
- throws NodeNotFoundException, ContainerNotFoundException {
+ throws NodeNotFoundException, IOException,
+ IllegalContainerBalancerStateException,
+ InvalidContainerBalancerConfigurationException {
Mockito.when(replicationManager.move(Mockito.any(ContainerID.class),
Mockito.any(DatanodeDetails.class),
@@ -673,7 +730,7 @@ public class TestContainerBalancer {
.getNumContainerMovesCompletedInLatestIteration());
Assert.assertTrue(containerBalancer.getMetrics()
.getNumContainerMovesTimeoutInLatestIteration() > 1);
- containerBalancer.stop();
+ stopBalancer();
}
@@ -859,13 +916,19 @@ public class TestContainerBalancer {
}
}
- private void startBalancer(ContainerBalancerConfiguration config) {
- containerBalancer.setConfig(config);
+ private void startBalancer(ContainerBalancerConfiguration config)
+ throws IllegalContainerBalancerStateException, IOException,
+ InvalidContainerBalancerConfigurationException {
+ containerBalancer.startBalancer(config);
+ }
+
+ private void stopBalancer() {
try {
- containerBalancer.startBalancer();
- } catch (IllegalContainerBalancerStateException |
- InvalidContainerBalancerConfigurationException e) {
- LOG.info("Could not start ContainerBalancer while testing", e);
+ if (containerBalancer.isBalancerRunning()) {
+ containerBalancer.stopBalancer();
+ }
+ } catch (IOException | IllegalContainerBalancerStateException e) {
+ LOG.warn("Failed to stop balancer", e);
}
}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestFailoverWithSCMHA.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestFailoverWithSCMHA.java
index b4882ac980..906b2aaf70 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestFailoverWithSCMHA.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestFailoverWithSCMHA.java
@@ -16,10 +16,17 @@
*/
package org.apache.hadoop.ozone.scm;
+import com.google.protobuf.ByteString;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.cli.ContainerOperationClient;
+import org.apache.hadoop.hdds.scm.client.ScmClient;
import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.balancer.ContainerBalancer;
+import org.apache.hadoop.hdds.scm.container.balancer.ContainerBalancerConfiguration;
+import org.apache.hadoop.hdds.scm.container.balancer.IllegalContainerBalancerStateException;
+import org.apache.hadoop.hdds.scm.container.balancer.InvalidContainerBalancerConfigurationException;
import org.apache.hadoop.hdds.scm.container.common.helpers.MoveDataNodePair;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
@@ -36,6 +43,7 @@ import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl;
import org.apache.ozone.test.GenericTestUtils;
import org.junit.Assert;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -44,7 +52,9 @@ import org.slf4j.event.Level;
import java.io.IOException;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.TimeoutException;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ContainerBalancerConfigurationProto;
import static org.apache.hadoop.hdds.scm.HddsTestUtils.getContainer;
import static org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails;
@@ -226,6 +236,89 @@ public class TestFailoverWithSCMHA {
Assert.assertFalse(inflightMove.containsKey(id));
}
+ /**
+ * Starts ContainerBalancer when the cluster is already balanced.
+ * ContainerBalancer will identify that no unbalanced nodes are present and
+ * exit and stop in the first iteration. We test that ContainerBalancer
+ * persists ContainerBalancerConfigurationProto#shouldRun as false in all
+ * the 3 SCMs when it stops.
+ * @throws IOException
+ * @throws IllegalContainerBalancerStateException
+ * @throws InvalidContainerBalancerConfigurationException
+ * @throws InterruptedException
+ * @throws TimeoutException
+ */
+ @Test
+ public void testContainerBalancerPersistsConfigurationInAllSCMs()
+ throws IOException, IllegalContainerBalancerStateException,
+ InvalidContainerBalancerConfigurationException, InterruptedException,
+ TimeoutException {
+ SCMClientConfig scmClientConfig =
+ conf.getObject(SCMClientConfig.class);
+ scmClientConfig.setRetryInterval(100);
+ scmClientConfig.setMaxRetryTimeout(1500);
+ Assertions.assertEquals(15, scmClientConfig.getRetryCount());
+ conf.setFromObject(scmClientConfig);
+ StorageContainerManager leader = getLeader(cluster);
+ Assertions.assertNotNull(leader);
+
+ ScmClient scmClient = new ContainerOperationClient(conf);
+ // assert that container balancer is not running right now
+ Assertions.assertFalse(scmClient.getContainerBalancerStatus());
+ ContainerBalancerConfiguration balancerConf =
+ conf.getObject(ContainerBalancerConfiguration.class);
+ ContainerBalancer containerBalancer = leader.getContainerBalancer();
+
+ /*
+ Start container balancer. Since this cluster is already balanced,
+ container balancer should exit early, stop, and persist configuration to DB.
+ */
+ containerBalancer.startBalancer(balancerConf);
+
+ // assert that balancer has stopped since the cluster is already balanced
+ GenericTestUtils.waitFor(() -> !containerBalancer.isBalancerRunning(),
+ 10, 500);
+ Assertions.assertFalse(containerBalancer.isBalancerRunning());
+
+ ByteString byteString =
+ leader.getScmMetadataStore().getStatefulServiceConfigTable().get(
+ containerBalancer.getServiceName());
+ ContainerBalancerConfigurationProto proto =
+ ContainerBalancerConfigurationProto.parseFrom(byteString);
+ GenericTestUtils.waitFor(() -> !proto.getShouldRun(), 5, 50);
+
+ long leaderTermIndex =
+ leader.getScmHAManager().getRatisServer().getSCMStateMachine()
+ .getLastAppliedTermIndex().getIndex();
+
+ /*
+ Fetch persisted configuration to verify that `shouldRun` is set to false.
+ */
+ for (StorageContainerManager scm : cluster.getStorageContainerManagers()) {
+ if (!scm.checkLeader()) {
+ // Wait and retry for follower to update transactions to leader
+ // snapshot index.
+ // Timeout error if follower does not load update within 3s
+ GenericTestUtils.waitFor(() -> scm.getScmHAManager().getRatisServer()
+ .getSCMStateMachine().getLastAppliedTermIndex()
+ .getIndex() >= leaderTermIndex, 100, 3000);
+ ContainerBalancer followerBalancer = scm.getContainerBalancer();
+ GenericTestUtils.waitFor(
+ () -> !followerBalancer.isBalancerRunning(), 50, 5000);
+ GenericTestUtils.waitFor(() -> !followerBalancer.shouldRun(), 100,
+ 5000);
+ }
+ scm.getStatefulServiceStateManager().readConfiguration(
+ containerBalancer.getServiceName());
+ byteString =
+ scm.getScmMetadataStore().getStatefulServiceConfigTable().get(
+ containerBalancer.getServiceName());
+ ContainerBalancerConfigurationProto protobuf =
+ ContainerBalancerConfigurationProto.parseFrom(byteString);
+ Assertions.assertFalse(protobuf.getShouldRun());
+ }
+ }
+
static StorageContainerManager getLeader(MiniOzoneHAClusterImpl impl) {
for (StorageContainerManager scm : impl.getStorageContainerManagers()) {
if (scm.checkLeader()) {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org