You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by um...@apache.org on 2022/05/04 16:31:41 UTC
[ozone] branch master updated: HDDS-6589. Add a new replication manager and change the existing one to legacy (#3352)
This is an automated email from the ASF dual-hosted git repository.
umamahesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new c99fd20da9 HDDS-6589. Add a new replication manager and change the existing one to legacy (#3352)
c99fd20da9 is described below
commit c99fd20da9fa2717f182531617f739335f9b7725
Author: Jackson Yao <ja...@tencent.com>
AuthorDate: Thu May 5 00:31:34 2022 +0800
HDDS-6589. Add a new replication manager and change the existing one to legacy (#3352)
---
.../hdds/scm/container/ContainerReplicaCount.java | 3 +-
.../scm/container/balancer/ContainerBalancer.java | 14 +-
.../ContainerBalancerSelectionCriteria.java | 2 +-
...tivityStatusMXBean.java => InflightAction.java} | 27 +-
.../LegacyReplicationManager.java} | 359 ++++------------
.../replication/ReplicationActivityStatus.java | 104 -----
.../container/replication/ReplicationManager.java | 450 +++++++++++++++++++++
.../replication/ReplicationManagerMetrics.java | 1 -
.../hdds/scm/node/DatanodeAdminMonitorImpl.java | 2 +-
.../hdds/scm/node/NodeDecommissionManager.java | 2 +-
.../scm/server/OzoneStorageContainerManager.java | 2 +-
.../hadoop/hdds/scm/server/SCMConfigurator.java | 2 +-
.../hdds/scm/server/StorageContainerManager.java | 2 +-
.../hdds/scm/container/TestReplicationManager.java | 61 ++-
.../container/balancer/TestContainerBalancer.java | 13 +-
.../replication/TestReplicationManagerMetrics.java | 1 -
.../hdds/scm/node/TestDatanodeAdminMonitor.java | 2 +-
.../apache/hadoop/ozone/MiniOzoneChaosCluster.java | 2 +-
.../ozone/insight/scm/ReplicaManagerInsight.java | 2 +-
.../safemode/TestSCMSafeModeWithPipelineRules.java | 2 +-
.../hadoop/ozone/TestStorageContainerManager.java | 88 ++--
.../rpc/TestContainerReplicationEndToEnd.java | 2 +-
.../ozone/client/rpc/read/TestInputStreamBase.java | 2 +-
.../ozone/container/TestContainerReplication.java | 2 +-
.../commandhandler/TestBlockDeletion.java | 6 +-
.../hadoop/ozone/scm/TestCloseContainer.java | 2 +-
.../scm/node/TestDecommissionAndMaintenance.java | 2 +-
.../scm/ReconStorageContainerManagerFacade.java | 2 +-
28 files changed, 653 insertions(+), 506 deletions(-)
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplicaCount.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplicaCount.java
index 3dab4ad83f..ec5a87a890 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplicaCount.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplicaCount.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdds.scm.container;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.container.replication.LegacyReplicationManager;
import java.util.Set;
@@ -266,7 +267,7 @@ public class ContainerReplicaCount {
|| container.getState() == HddsProtos.LifeCycleState.QUASI_CLOSED)
&& replica.stream()
.filter(r -> r.getDatanodeDetails().getPersistedOpState() == IN_SERVICE)
- .allMatch(r -> ReplicationManager.compareState(
+ .allMatch(r -> LegacyReplicationManager.compareState(
container.getState(), r.getState()));
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
index 09d289e817..c2d96ffbd2 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
@@ -29,7 +29,8 @@ import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
-import org.apache.hadoop.hdds.scm.container.ReplicationManager;
+import org.apache.hadoop.hdds.scm.container.replication.LegacyReplicationManager;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.ha.SCMService;
@@ -106,7 +107,7 @@ public class ContainerBalancer implements SCMService {
private FindTargetStrategy findTargetStrategy;
private FindSourceStrategy findSourceStrategy;
private Map<ContainerMoveSelection,
- CompletableFuture<ReplicationManager.MoveResult>>
+ CompletableFuture<LegacyReplicationManager.MoveResult>>
moveSelectionToFutureMap;
private IterationResult iterationResult;
@@ -461,7 +462,6 @@ public class ContainerBalancer implements SCMService {
*/
private void checkIterationMoveResults(Set<DatanodeDetails> selectedTargets) {
this.countDatanodesInvolvedPerIteration = 0;
-
CompletableFuture<Void> allFuturesResult = CompletableFuture.allOf(
moveSelectionToFutureMap.values()
.toArray(new CompletableFuture[moveSelectionToFutureMap.size()]));
@@ -586,7 +586,7 @@ public class ContainerBalancer implements SCMService {
private boolean moveContainer(DatanodeDetails source,
ContainerMoveSelection moveSelection) {
ContainerID containerID = moveSelection.getContainerID();
- CompletableFuture<ReplicationManager.MoveResult> future;
+ CompletableFuture<LegacyReplicationManager.MoveResult> future;
try {
ContainerInfo containerInfo = containerManager.getContainer(containerID);
future = replicationManager
@@ -599,7 +599,7 @@ public class ContainerBalancer implements SCMService {
source.getUuidString(),
moveSelection.getTargetNode().getUuidString(), ex);
} else {
- if (result == ReplicationManager.MoveResult.COMPLETED) {
+ if (result == LegacyReplicationManager.MoveResult.COMPLETED) {
metrics.incrementDataSizeMovedGBInLatestIteration(
containerInfo.getUsedBytes() / OzoneConsts.GB);
metrics.incrementNumContainerMovesCompletedInLatestIteration(1);
@@ -630,9 +630,9 @@ public class ContainerBalancer implements SCMService {
if (future.isCompletedExceptionally()) {
return false;
} else {
- ReplicationManager.MoveResult result = future.join();
+ LegacyReplicationManager.MoveResult result = future.join();
moveSelectionToFutureMap.put(moveSelection, future);
- return result == ReplicationManager.MoveResult.COMPLETED;
+ return result == LegacyReplicationManager.MoveResult.COMPLETED;
}
} else {
moveSelectionToFutureMap.put(moveSelection, future);
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerSelectionCriteria.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerSelectionCriteria.java
index b5f5acd8fa..bfcca778a4 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerSelectionCriteria.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerSelectionCriteria.java
@@ -22,7 +22,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
-import org.apache.hadoop.hdds.scm.container.ReplicationManager;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.slf4j.Logger;
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationActivityStatusMXBean.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/InflightAction.java
similarity index 58%
rename from hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationActivityStatusMXBean.java
rename to hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/InflightAction.java
index 164bd247ef..153d5dc2c6 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationActivityStatusMXBean.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/InflightAction.java
@@ -17,12 +17,29 @@
*/
package org.apache.hadoop.hdds.scm.container.replication;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+
/**
- * JMX interface to monitor replication status.
- */
-public interface ReplicationActivityStatusMXBean {
+* InflightAction is a Wrapper class to hold the InflightAction
+* with its start time and the target datanode.
+*/
+public class InflightAction {
+ private final DatanodeDetails datanode;
+ private final long time;
+
+ public InflightAction(final DatanodeDetails datanode,
+ final long time) {
+ this.datanode = datanode;
+ this.time = time;
+ }
- boolean isReplicationEnabled();
+ @VisibleForTesting
+ public DatanodeDetails getDatanode() {
+ return datanode;
+ }
- void setReplicationEnabled(boolean enabled);
+ public long getTime() {
+ return time;
+ }
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java
similarity index 89%
rename from hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
rename to hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java
index 279163b8f3..47ac3d3541 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java
@@ -16,34 +16,9 @@
* limitations under the License.
*/
-package org.apache.hadoop.hdds.scm.container;
+package org.apache.hadoop.hdds.scm.container.replication;
-import java.io.IOException;
-import java.lang.reflect.Proxy;
-import java.time.Clock;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.function.Consumer;
-import java.util.function.Predicate;
-import java.util.stream.Collectors;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.hdds.HddsConfigKeys;
+import com.google.protobuf.GeneratedMessage;
import org.apache.hadoop.hdds.conf.Config;
import org.apache.hadoop.hdds.conf.ConfigGroup;
import org.apache.hadoop.hdds.conf.ConfigType;
@@ -52,31 +27,34 @@ import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState;
-import org.apache.hadoop.hdds.protocol.proto.
- StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State;
-import static org.apache.hadoop.hdds.protocol.proto.
- SCMRatisProtocol.RequestType.MOVE;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State;
import org.apache.hadoop.hdds.scm.ContainerPlacementStatus;
import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-import org.apache.hadoop.hdds.scm.container.replication.ReplicationManagerMetrics;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.container.ContainerReplicaCount;
+import org.apache.hadoop.hdds.scm.container.ReplicationManagerReport;
+import org.apache.hadoop.hdds.scm.container.ReplicationManagerReport.HealthState;
+import org.apache.hadoop.hdds.scm.container.common.helpers.MoveDataNodePair;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.ha.SCMHAInvocationHandler;
import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
import org.apache.hadoop.hdds.scm.ha.SCMRatisServer;
-import org.apache.hadoop.hdds.scm.ha.SCMService;
-import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
-import org.apache.hadoop.hdds.scm.metadata.Replicate;
import org.apache.hadoop.hdds.scm.metadata.DBTransactionBuffer;
+import org.apache.hadoop.hdds.scm.metadata.Replicate;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.node.NodeStatus;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdds.server.events.EventPublisher;
-import org.apache.hadoop.hdds.scm.container.common.helpers.MoveDataNodePair;
+import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.hdds.utils.db.TableIterator;
import org.apache.hadoop.ozone.ClientVersion;
import org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException;
@@ -85,30 +63,44 @@ import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
-import org.apache.hadoop.util.ExitUtil;
-import org.apache.hadoop.hdds.utils.db.Table;
-import org.apache.hadoop.hdds.scm.container.ReplicationManagerReport.HealthState;
-
-import com.google.protobuf.GeneratedMessage;
-import static org.apache.hadoop.hdds.conf.ConfigTag.OZONE;
-import static org.apache.hadoop.hdds.conf.ConfigTag.SCM;
-
import org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.apache.ratis.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.lang.reflect.Proxy;
+import java.time.Clock;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import static org.apache.hadoop.hdds.conf.ConfigTag.OZONE;
+import static org.apache.hadoop.hdds.conf.ConfigTag.SCM;
+import static org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType.MOVE;
+
/**
- * Replication Manager (RM) is the one which is responsible for making sure
- * that the containers are properly replicated. Replication Manager deals only
- * with Quasi Closed / Closed container.
+ * Legacy Replication Manager (RM) is a legacy , which is used to process
+ * non-EC container, and hopefully to be replaced int the future.
*/
-public class ReplicationManager implements SCMService {
+public class LegacyReplicationManager {
public static final Logger LOG =
- LoggerFactory.getLogger(ReplicationManager.class);
-
- public static final String METRICS_SOURCE_NAME = "SCMReplicationManager";
+ LoggerFactory.getLogger(LegacyReplicationManager.class);
/**
* Reference to the ContainerManager.
@@ -211,40 +203,19 @@ public class ReplicationManager implements SCMService {
*/
private final ReplicationManagerConfiguration rmConf;
- /**
- * ReplicationMonitor thread is the one which wakes up at configured
- * interval and processes all the containers.
- */
- private Thread replicationMonitor;
-
- /**
- * Flag used for checking if the ReplicationMonitor thread is running or
- * not.
- */
- private volatile boolean running;
-
/**
* Minimum number of replica in a healthy state for maintenance.
*/
private int minHealthyForMaintenance;
+ private final Clock clock;
+
/**
* Current container size as a bound for choosing datanodes with
* enough space for a replica.
*/
private long currentContainerSize;
- /**
- * SCMService related variables.
- * After leaving safe mode, replicationMonitor needs to wait for a while
- * before really take effect.
- */
- private final Lock serviceLock = new ReentrantLock();
- private ServiceStatus serviceStatus = ServiceStatus.PAUSING;
- private final long waitTimeInMillis;
- private long lastTimeToBeReadyInMillis = 0;
- private final Clock clock;
-
/**
* Replication progress related metrics.
*/
@@ -255,10 +226,6 @@ public class ReplicationManager implements SCMService {
*/
private final MoveScheduler moveScheduler;
- /**
- * Report object that is refreshed each time replication Manager runs.
- */
- private ReplicationManagerReport containerReport;
/**
* Constructs ReplicationManager instance with the given configuration.
@@ -269,16 +236,15 @@ public class ReplicationManager implements SCMService {
* @param eventPublisher EventPublisher
*/
@SuppressWarnings("parameternumber")
- public ReplicationManager(final ConfigurationSource conf,
- final ContainerManager containerManager,
- final PlacementPolicy containerPlacement,
- final EventPublisher eventPublisher,
- final SCMContext scmContext,
- final SCMServiceManager serviceManager,
- final NodeManager nodeManager,
- final java.time.Clock clock,
- final SCMHAManager scmhaManager,
- final Table<ContainerID, MoveDataNodePair> moveTable)
+ public LegacyReplicationManager(final ConfigurationSource conf,
+ final ContainerManager containerManager,
+ final PlacementPolicy containerPlacement,
+ final EventPublisher eventPublisher,
+ final SCMContext scmContext,
+ final NodeManager nodeManager,
+ final SCMHAManager scmhaManager,
+ final Clock clock,
+ final Table<ContainerID, MoveDataNodePair> moveTable)
throws IOException {
this.containerManager = containerManager;
this.containerPlacement = containerPlacement;
@@ -286,18 +252,12 @@ public class ReplicationManager implements SCMService {
this.scmContext = scmContext;
this.nodeManager = nodeManager;
this.rmConf = conf.getObject(ReplicationManagerConfiguration.class);
- this.running = false;
this.inflightReplication = new ConcurrentHashMap<>();
this.inflightDeletion = new ConcurrentHashMap<>();
this.inflightMoveFuture = new ConcurrentHashMap<>();
this.minHealthyForMaintenance = rmConf.getMaintenanceReplicaMinimum();
this.clock = clock;
- this.containerReport = new ReplicationManagerReport();
- this.waitTimeInMillis = conf.getTimeDuration(
- HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT,
- HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT_DEFAULT,
- TimeUnit.MILLISECONDS);
this.currentContainerSize = (long) conf.getStorageSize(
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT,
@@ -308,109 +268,16 @@ public class ReplicationManager implements SCMService {
.setDBTransactionBuffer(scmhaManager.getDBTransactionBuffer())
.setRatisServer(scmhaManager.getRatisServer())
.setMoveTable(moveTable).build();
-
- // register ReplicationManager to SCMServiceManager.
- serviceManager.register(this);
-
- // start ReplicationManager.
- start();
- }
-
- /**
- * Starts Replication Monitor thread.
- */
- @Override
- public synchronized void start() {
- if (!isRunning()) {
- metrics = ReplicationManagerMetrics.create(this);
- LOG.info("Starting Replication Monitor Thread.");
- running = true;
- replicationMonitor = new Thread(this::run);
- replicationMonitor.setName("ReplicationMonitor");
- replicationMonitor.setDaemon(true);
- replicationMonitor.start();
- } else {
- LOG.info("Replication Monitor Thread is already running.");
- }
- }
-
- /**
- * Returns true if the Replication Monitor Thread is running.
- *
- * @return true if running, false otherwise
- */
- public boolean isRunning() {
- if (!running) {
- synchronized (this) {
- return replicationMonitor != null
- && replicationMonitor.isAlive();
- }
- }
- return true;
}
- /**
- * Stops Replication Monitor thread.
- */
- public synchronized void stop() {
- if (running) {
- LOG.info("Stopping Replication Monitor Thread.");
- inflightReplication.clear();
- inflightDeletion.clear();
- running = false;
- metrics.unRegister();
- notifyAll();
- } else {
- LOG.info("Replication Monitor Thread is not running.");
- }
- }
-
- /**
- * Process all the containers now, and wait for the processing to complete.
- * This in intended to be used in tests.
- */
- public synchronized void processAll() {
- if (!shouldRun()) {
- LOG.info("Replication Manager is not ready to run until {}ms after " +
- "safemode exit", waitTimeInMillis);
- return;
- }
- final long start = clock.millis();
- final List<ContainerInfo> containers =
- containerManager.getContainers();
- ReplicationManagerReport report = new ReplicationManagerReport();
- for (ContainerInfo c : containers) {
- processContainer(c, report);
- }
- report.setComplete();
- containerReport = report;
- LOG.info("Replication Monitor Thread took {} milliseconds for" +
- " processing {} containers.", clock.millis() - start,
- containers.size());
- }
- public ReplicationManagerReport getContainerReport() {
- return containerReport;
+ protected synchronized void clearInflightActions() {
+ inflightReplication.clear();
+ inflightDeletion.clear();
}
- /**
- * ReplicationMonitor thread runnable. This wakes up at configured
- * interval and processes all the containers in the system.
- */
- private synchronized void run() {
- try {
- while (running) {
- processAll();
- wait(rmConf.getInterval());
- }
- } catch (Throwable t) {
- // When we get runtime exception, we should terminate SCM.
- LOG.error("Exception in Replication Monitor Thread.", t);
- if (t instanceof InterruptedException) {
- Thread.currentThread().interrupt();
- }
- ExitUtil.terminate(1, t);
- }
+ protected synchronized void setMetrics(ReplicationManagerMetrics metrics) {
+ this.metrics = metrics;
}
/**
@@ -419,11 +286,8 @@ public class ReplicationManager implements SCMService {
* @param container ContainerInfo
*/
@SuppressWarnings("checkstyle:methodlength")
- private void processContainer(ContainerInfo container,
+ protected void processContainer(ContainerInfo container,
ReplicationManagerReport report) {
- if (!shouldRun()) {
- return;
- }
final ContainerID id = container.containerID();
try {
// synchronize on the containerInfo object to solve container
@@ -492,14 +356,14 @@ public class ReplicationManager implements SCMService {
* list, if the operation is completed or if it has timed out.
*/
updateInflightAction(container, inflightReplication,
- action -> replicas.stream()
- .anyMatch(r -> r.getDatanodeDetails().equals(action.datanode)),
+ action -> replicas.stream().anyMatch(
+ r -> r.getDatanodeDetails().equals(action.getDatanode())),
() -> metrics.incrNumReplicationCmdsTimeout(),
action -> updateCompletedReplicationMetrics(container, action));
updateInflightAction(container, inflightDeletion,
- action -> replicas.stream()
- .noneMatch(r -> r.getDatanodeDetails().equals(action.datanode)),
+ action -> replicas.stream().noneMatch(
+ r -> r.getDatanodeDetails().equals(action.getDatanode())),
() -> metrics.incrNumDeletionCmdsTimeout(),
action -> updateCompletedDeletionMetrics(container, action));
@@ -601,14 +465,14 @@ public class ReplicationManager implements SCMService {
InflightAction action) {
metrics.incrNumReplicationCmdsCompleted();
metrics.incrNumReplicationBytesCompleted(container.getUsedBytes());
- metrics.addReplicationTime(clock.millis() - action.time);
+ metrics.addReplicationTime(clock.millis() - action.getTime());
}
private void updateCompletedDeletionMetrics(ContainerInfo container,
InflightAction action) {
metrics.incrNumDeletionCmdsCompleted();
metrics.incrNumDeletionBytesCompleted(container.getUsedBytes());
- metrics.addDeletionTime(clock.millis() - action.time);
+ metrics.addDeletionTime(clock.millis() - action.getTime());
}
/**
@@ -634,10 +498,10 @@ public class ReplicationManager implements SCMService {
while (iter.hasNext()) {
try {
InflightAction a = iter.next();
- NodeStatus status = nodeManager.getNodeStatus(a.datanode);
+ NodeStatus status = nodeManager.getNodeStatus(a.getDatanode());
boolean isUnhealthy = status.getHealth() != NodeState.HEALTHY;
boolean isCompleted = filter.test(a);
- boolean isTimeout = a.time < deadline;
+ boolean isTimeout = a.getTime() < deadline;
boolean isNotInService = status.getOperationalState() !=
NodeOperationalState.IN_SERVICE;
if (isCompleted || isUnhealthy || isTimeout || isNotInService) {
@@ -650,7 +514,7 @@ public class ReplicationManager implements SCMService {
}
updateMoveIfNeeded(isUnhealthy, isCompleted, isTimeout,
- isNotInService, container, a.datanode, inflightActions);
+ isNotInService, container, a.getDatanode(), inflightActions);
}
} catch (NodeNotFoundException | ContainerNotFoundException e) {
// Should not happen, but if it does, just remove the action as the
@@ -781,14 +645,10 @@ public class ReplicationManager implements SCMService {
* @param cid Container to move
* @param mp MoveDataNodePair which contains source and target datanodes
*/
- public CompletableFuture<MoveResult> move(ContainerID cid,
+ private CompletableFuture<MoveResult> move(ContainerID cid,
MoveDataNodePair mp)
throws ContainerNotFoundException, NodeNotFoundException {
CompletableFuture<MoveResult> ret = new CompletableFuture<>();
- if (!isRunning()) {
- ret.complete(MoveResult.FAIL_NOT_RUNNING);
- return ret;
- }
if (!scmContext.isLeader()) {
ret.complete(MoveResult.FAIL_NOT_LEADER);
@@ -1096,7 +956,7 @@ public class ReplicationManager implements SCMService {
final List<DatanodeDetails> deletionInFlight = inflightDeletion
.getOrDefault(container.containerID(), Collections.emptyList())
.stream()
- .map(action -> action.datanode)
+ .map(action -> action.getDatanode())
.collect(Collectors.toList());
Set<ContainerReplica> filteredReplicas = replicas.stream().filter(
r -> !deletionInFlight.contains(r.getDatanodeDetails()))
@@ -1175,12 +1035,12 @@ public class ReplicationManager implements SCMService {
final List<DatanodeDetails> deletionInFlight = inflightDeletion
.getOrDefault(id, Collections.emptyList())
.stream()
- .map(action -> action.datanode)
+ .map(action -> action.getDatanode())
.collect(Collectors.toList());
final List<DatanodeDetails> replicationInFlight = inflightReplication
.getOrDefault(id, Collections.emptyList())
.stream()
- .map(action -> action.datanode)
+ .map(action -> action.getDatanode())
.collect(Collectors.toList());
final List<DatanodeDetails> source = replicas.stream()
.filter(r ->
@@ -1314,7 +1174,7 @@ public class ReplicationManager implements SCMService {
// also many not be available
eligibleReplicas.removeIf(r ->
r.getDatanodeDetails().getPersistedOpState() !=
- HddsProtos.NodeOperationalState.IN_SERVICE);
+ NodeOperationalState.IN_SERVICE);
final List<ContainerReplica> unhealthyReplicas = eligibleReplicas
.stream()
@@ -1704,7 +1564,7 @@ public class ReplicationManager implements SCMService {
ContainerInfo container, Set<ContainerReplica> replicas) {
LifeCycleState state = container.getState();
return replicas.stream()
- .allMatch(r -> ReplicationManager.compareState(state, r.getState()));
+ .allMatch(r -> compareState(state, r.getState()));
}
public boolean isContainerReplicatingOrDeleting(ContainerID containerID) {
@@ -1712,26 +1572,6 @@ public class ReplicationManager implements SCMService {
inflightDeletion.containsKey(containerID);
}
- /**
- * Wrapper class to hold the InflightAction with its start time.
- */
- static final class InflightAction {
-
- private final DatanodeDetails datanode;
- private final long time;
-
- private InflightAction(final DatanodeDetails datanode,
- final long time) {
- this.datanode = datanode;
- this.time = time;
- }
-
- @VisibleForTesting
- public DatanodeDetails getDatanode() {
- return datanode;
- }
- }
-
/**
* Configuration used by the Replication Manager.
*/
@@ -1803,49 +1643,10 @@ public class ReplicationManager implements SCMService {
}
}
- @Override
- public void notifyStatusChanged() {
- serviceLock.lock();
- try {
- // 1) SCMContext#isLeaderReady returns true.
- // 2) not in safe mode.
- if (scmContext.isLeaderReady() && !scmContext.isInSafeMode()) {
- // transition from PAUSING to RUNNING
- if (serviceStatus != ServiceStatus.RUNNING) {
- LOG.info("Service {} transitions to RUNNING.", getServiceName());
- lastTimeToBeReadyInMillis = clock.millis();
- serviceStatus = ServiceStatus.RUNNING;
- }
- //now, as the current scm is leader and it`s state is up-to-date,
- //we need to take some action about replicated inflight move options.
- onLeaderReadyAndOutOfSafeMode();
- } else {
- serviceStatus = ServiceStatus.PAUSING;
- }
- } finally {
- serviceLock.unlock();
- }
- }
-
- @Override
- public boolean shouldRun() {
- serviceLock.lock();
- try {
- // If safe mode is off, then this SCMService starts to run with a delay.
- return serviceStatus == ServiceStatus.RUNNING &&
- clock.millis() - lastTimeToBeReadyInMillis >= waitTimeInMillis;
- } finally {
- serviceLock.unlock();
- }
- }
-
- @Override
- public String getServiceName() {
- return ReplicationManager.class.getSimpleName();
- }
-
- public ReplicationManagerMetrics getMetrics() {
- return this.metrics;
+ protected void notifyStatusChanged() {
+ //now, as the current scm is leader and it`s state is up-to-date,
+ //we need to take some action about replicated inflight move options.
+ onLeaderReadyAndOutOfSafeMode();
}
public Map<ContainerID, List<InflightAction>> getInflightReplication() {
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationActivityStatus.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationActivityStatus.java
deleted file mode 100644
index 92a30d5c26..0000000000
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationActivityStatus.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdds.scm.container.replication;
-
-import javax.management.ObjectName;
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.hadoop.hdds.HddsConfigKeys;
-import org.apache.hadoop.metrics2.util.MBeans;
-
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.hdds.utils.Scheduler;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Event listener to track the current state of replication.
- */
-public class ReplicationActivityStatus implements
- ReplicationActivityStatusMXBean, Closeable {
-
- private static final Logger LOG =
- LoggerFactory.getLogger(ReplicationActivityStatus.class);
-
- private Scheduler scheduler;
- private AtomicBoolean replicationEnabled = new AtomicBoolean();
- private ObjectName jmxObjectName;
-
- public ReplicationActivityStatus(Scheduler scheduler) {
- this.scheduler = scheduler;
- }
-
- @Override
- public boolean isReplicationEnabled() {
- return replicationEnabled.get();
- }
-
- @VisibleForTesting
- @Override
- public void setReplicationEnabled(boolean enabled) {
- replicationEnabled.set(enabled);
- }
-
- @VisibleForTesting
- public void enableReplication() {
- replicationEnabled.set(true);
- }
-
-
- public void start() {
- try {
- this.jmxObjectName =
- MBeans.register(
- "StorageContainerManager", "ReplicationActivityStatus", this);
- } catch (Exception ex) {
- LOG.error("JMX bean for ReplicationActivityStatus can't be registered",
- ex);
- }
- }
-
- @Override
- public void close() throws IOException {
- if (this.jmxObjectName != null) {
- MBeans.unregister(jmxObjectName);
- }
- }
-
- /**
- * Waits for
- * {@link HddsConfigKeys#HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT} and set
- * replicationEnabled to start replication monitor thread.
- */
- public void fireReplicationStart(boolean safeModeStatus,
- long waitTime) {
- if (!safeModeStatus) {
- scheduler.schedule(() -> {
- setReplicationEnabled(true);
- LOG.info("Replication Timer sleep for {} ms completed. Enable "
- + "Replication", waitTime);
- }, waitTime, TimeUnit.MILLISECONDS);
- }
- }
-
-
-}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
new file mode 100644
index 0000000000..e4020955f7
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
@@ -0,0 +1,450 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container.replication;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.conf.Config;
+import org.apache.hadoop.hdds.conf.ConfigGroup;
+import org.apache.hadoop.hdds.conf.ConfigType;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.PlacementPolicy;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
+import org.apache.hadoop.hdds.scm.container.ContainerReplicaCount;
+import org.apache.hadoop.hdds.scm.container.ReplicationManagerReport;
+import org.apache.hadoop.hdds.scm.container.common.helpers.MoveDataNodePair;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
+import org.apache.hadoop.hdds.scm.ha.SCMService;
+import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.util.ExitUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Clock;
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.apache.hadoop.hdds.conf.ConfigTag.OZONE;
+import static org.apache.hadoop.hdds.conf.ConfigTag.SCM;
+
+/**
+ * Replication Manager (RM) is the one which is responsible for making sure
+ * that the containers are properly replicated. Replication Manager deals only
+ * with Quasi Closed / Closed container.
+ */
+public class ReplicationManager implements SCMService {
+
+ public static final Logger LOG =
+ LoggerFactory.getLogger(ReplicationManager.class);
+
+ /**
+ * Reference to the ContainerManager.
+ */
+ private final ContainerManager containerManager;
+
+
+ /**
+ * SCMContext from StorageContainerManager.
+ */
+ private final SCMContext scmContext;
+
+
+ /**
+ * ReplicationManager specific configuration.
+ */
+ private final ReplicationManagerConfiguration rmConf;
+
+ /**
+ * ReplicationMonitor thread is the one which wakes up at configured
+ * interval and processes all the containers.
+ */
+ private Thread replicationMonitor;
+
+ /**
+ * Flag used for checking if the ReplicationMonitor thread is running or
+ * not.
+ */
+ private volatile boolean running;
+
+ /**
+ * Report object that is refreshed each time replication Manager runs.
+ */
+ private ReplicationManagerReport containerReport;
+
+ /**
+ * Replication progress related metrics.
+ */
+ private ReplicationManagerMetrics metrics;
+
+
+ /**
+ * Legacy RM will hopefully be removed after completing refactor
+ * for now, it is used to process non-EC container.
+ */
+ private LegacyReplicationManager legacyReplicationManager;
+
+ /**
+ * SCMService related variables.
+ * After leaving safe mode, replicationMonitor needs to wait for a while
+ * before really take effect.
+ */
+ private final Lock serviceLock = new ReentrantLock();
+ private ServiceStatus serviceStatus = ServiceStatus.PAUSING;
+ private final long waitTimeInMillis;
+ private long lastTimeToBeReadyInMillis = 0;
+ private final Clock clock;
+
+ /**
+ * Constructs ReplicationManager instance with the given configuration.
+ *
+ * @param conf OzoneConfiguration
+ * @param containerManager ContainerManager
+ * @param containerPlacement PlacementPolicy
+ * @param eventPublisher EventPublisher
+ */
+ @SuppressWarnings("parameternumber")
+ public ReplicationManager(final ConfigurationSource conf,
+ final ContainerManager containerManager,
+ final PlacementPolicy containerPlacement,
+ final EventPublisher eventPublisher,
+ final SCMContext scmContext,
+ final SCMServiceManager serviceManager,
+ final NodeManager nodeManager,
+ final Clock clock,
+ final SCMHAManager scmhaManager,
+ final Table<ContainerID, MoveDataNodePair> moveTable)
+ throws IOException {
+ this.containerManager = containerManager;
+ this.scmContext = scmContext;
+ this.rmConf = conf.getObject(ReplicationManagerConfiguration.class);
+ this.running = false;
+ this.clock = clock;
+ this.containerReport = new ReplicationManagerReport();
+ this.metrics = null;
+ this.waitTimeInMillis = conf.getTimeDuration(
+ HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT,
+ HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT_DEFAULT,
+ TimeUnit.MILLISECONDS);
+ this.legacyReplicationManager = new LegacyReplicationManager(
+ conf, containerManager, containerPlacement, eventPublisher,
+ scmContext, nodeManager, scmhaManager, clock, moveTable);
+
+ // register ReplicationManager to SCMServiceManager.
+ serviceManager.register(this);
+
+ // start ReplicationManager.
+ start();
+ }
+
+ /**
+ * Starts Replication Monitor thread.
+ */
+ @Override
+ public synchronized void start() {
+ if (!isRunning()) {
+ LOG.info("Starting Replication Monitor Thread.");
+ running = true;
+ metrics = ReplicationManagerMetrics.create(this);
+ legacyReplicationManager.setMetrics(metrics);
+ replicationMonitor = new Thread(this::run);
+ replicationMonitor.setName("ReplicationMonitor");
+ replicationMonitor.setDaemon(true);
+ replicationMonitor.start();
+ } else {
+ LOG.info("Replication Monitor Thread is already running.");
+ }
+ }
+
+ /**
+ * Returns true if the Replication Monitor Thread is running.
+ *
+ * @return true if running, false otherwise
+ */
+ public boolean isRunning() {
+ if (!running) {
+ synchronized (this) {
+ return replicationMonitor != null
+ && replicationMonitor.isAlive();
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Stops Replication Monitor thread.
+ */
+ public synchronized void stop() {
+ if (running) {
+ LOG.info("Stopping Replication Monitor Thread.");
+ running = false;
+ legacyReplicationManager.clearInflightActions();
+ metrics.unRegister();
+ replicationMonitor.interrupt();
+ } else {
+ LOG.info("Replication Monitor Thread is not running.");
+ }
+ }
+
+ /**
+ * Process all the containers now, and wait for the processing to complete.
+ * This in intended to be used in tests.
+ */
+ public synchronized void processAll() {
+ if (!shouldRun()) {
+ LOG.info("Replication Manager is not ready to run until {}ms after " +
+ "safemode exit", waitTimeInMillis);
+ return;
+ }
+ final long start = clock.millis();
+ final List<ContainerInfo> containers =
+ containerManager.getContainers();
+ ReplicationManagerReport report = new ReplicationManagerReport();
+ for (ContainerInfo c : containers) {
+ if (!shouldRun()) {
+ break;
+ }
+ switch (c.getReplicationType()) {
+ case EC:
+ break;
+ default:
+ legacyReplicationManager.processContainer(c, report);
+ }
+ }
+ report.setComplete();
+ this.containerReport = report;
+ LOG.info("Replication Monitor Thread took {} milliseconds for" +
+ " processing {} containers.", clock.millis() - start,
+ containers.size());
+ }
+
+ public ReplicationManagerReport getContainerReport() {
+ return containerReport;
+ }
+
+ /**
+ * ReplicationMonitor thread runnable. This wakes up at configured
+ * interval and processes all the containers in the system.
+ */
+ private synchronized void run() {
+ try {
+ while (running) {
+ processAll();
+ wait(rmConf.getInterval());
+ }
+ } catch (Throwable t) {
+ if (t instanceof InterruptedException) {
+ LOG.info("Replication Monitor Thread is stopped");
+ Thread.currentThread().interrupt();
+ } else {
+ // When we get runtime exception, we should terminate SCM.
+ LOG.error("Exception in Replication Monitor Thread.", t);
+ ExitUtil.terminate(1, t);
+ }
+ }
+ }
+
+ /**
+ * Given a ContainerID, lookup the ContainerInfo and then return a
+ * ContainerReplicaCount object for the container.
+ * @param containerID The ID of the container
+ * @return ContainerReplicaCount for the given container
+ * @throws ContainerNotFoundException
+ */
+ public ContainerReplicaCount getContainerReplicaCount(ContainerID containerID)
+ throws ContainerNotFoundException {
+ return legacyReplicationManager.getContainerReplicaCount(containerID);
+ }
+
+ /**
+ * Configuration used by the Replication Manager.
+ */
+ @ConfigGroup(prefix = "hdds.scm.replication")
+ public static class ReplicationManagerConfiguration {
+ /**
+ * The frequency in which ReplicationMonitor thread should run.
+ */
+ @Config(key = "thread.interval",
+ type = ConfigType.TIME,
+ defaultValue = "300s",
+ tags = {SCM, OZONE},
+ description = "There is a replication monitor thread running inside " +
+ "SCM which takes care of replicating the containers in the " +
+ "cluster. This property is used to configure the interval in " +
+ "which that thread runs."
+ )
+ private long interval = Duration.ofSeconds(300).toMillis();
+
+ /**
+ * Timeout for container replication & deletion command issued by
+ * ReplicationManager.
+ */
+ @Config(key = "event.timeout",
+ type = ConfigType.TIME,
+ defaultValue = "30m",
+ tags = {SCM, OZONE},
+ description = "Timeout for the container replication/deletion commands "
+ + "sent to datanodes. After this timeout the command will be "
+ + "retried.")
+ private long eventTimeout = Duration.ofMinutes(30).toMillis();
+ public void setInterval(Duration interval) {
+ this.interval = interval.toMillis();
+ }
+
+ public void setEventTimeout(Duration timeout) {
+ this.eventTimeout = timeout.toMillis();
+ }
+
+ /**
+ * The number of container replica which must be available for a node to
+ * enter maintenance.
+ */
+ @Config(key = "maintenance.replica.minimum",
+ type = ConfigType.INT,
+ defaultValue = "2",
+ tags = {SCM, OZONE},
+ description = "The minimum number of container replicas which must " +
+ " be available for a node to enter maintenance. If putting a " +
+ " node into maintenance reduces the available replicas for any " +
+ " container below this level, the node will remain in the " +
+ " entering maintenance state until a new replica is created.")
+ private int maintenanceReplicaMinimum = 2;
+
+ public void setMaintenanceReplicaMinimum(int replicaCount) {
+ this.maintenanceReplicaMinimum = replicaCount;
+ }
+
+ public long getInterval() {
+ return interval;
+ }
+
+ public long getEventTimeout() {
+ return eventTimeout;
+ }
+
+ public int getMaintenanceReplicaMinimum() {
+ return maintenanceReplicaMinimum;
+ }
+ }
+
+ @Override
+ public void notifyStatusChanged() {
+ serviceLock.lock();
+ try {
+ // 1) SCMContext#isLeaderReady returns true.
+ // 2) not in safe mode.
+ if (scmContext.isLeaderReady() && !scmContext.isInSafeMode()) {
+ // transition from PAUSING to RUNNING
+ if (serviceStatus != ServiceStatus.RUNNING) {
+ LOG.info("Service {} transitions to RUNNING.", getServiceName());
+ lastTimeToBeReadyInMillis = clock.millis();
+ serviceStatus = ServiceStatus.RUNNING;
+ }
+ //now, as the current scm is leader and it`s state is up-to-date,
+ //we need to take some action about replicated inflight move options.
+ legacyReplicationManager.notifyStatusChanged();
+ } else {
+ serviceStatus = ServiceStatus.PAUSING;
+ }
+ } finally {
+ serviceLock.unlock();
+ }
+ }
+
+ @Override
+ public boolean shouldRun() {
+ serviceLock.lock();
+ try {
+ // If safe mode is off, then this SCMService starts to run with a delay.
+ return serviceStatus == ServiceStatus.RUNNING &&
+ clock.millis() - lastTimeToBeReadyInMillis >= waitTimeInMillis;
+ } finally {
+ serviceLock.unlock();
+ }
+ }
+
+ @Override
+ public String getServiceName() {
+ return ReplicationManager.class.getSimpleName();
+ }
+
+ public ReplicationManagerMetrics getMetrics() {
+ return metrics;
+ }
+
+
+ /**
+ * following functions will be refactored in a seperate jira.
+ */
+ public CompletableFuture<LegacyReplicationManager.MoveResult> move(
+ ContainerID cid, DatanodeDetails src, DatanodeDetails tgt)
+ throws NodeNotFoundException, ContainerNotFoundException {
+ CompletableFuture<LegacyReplicationManager.MoveResult> ret =
+ new CompletableFuture<>();
+ if (!isRunning()) {
+ ret.complete(LegacyReplicationManager.MoveResult.FAIL_NOT_RUNNING);
+ return ret;
+ }
+
+ return legacyReplicationManager.move(cid, src, tgt);
+ }
+
+ public Map<ContainerID, List<InflightAction>> getInflightReplication() {
+ return legacyReplicationManager.getInflightReplication();
+ }
+
+ public Map<ContainerID, List<InflightAction>> getInflightDeletion() {
+ return legacyReplicationManager.getInflightDeletion();
+ }
+
+ public Map<ContainerID,
+ CompletableFuture<LegacyReplicationManager.MoveResult>>
+ getInflightMove() {
+ return legacyReplicationManager.getInflightMove();
+ }
+
+ public LegacyReplicationManager.MoveScheduler getMoveScheduler() {
+ return legacyReplicationManager.getMoveScheduler();
+ }
+
+ @VisibleForTesting
+ public LegacyReplicationManager getLegacyReplicationManager() {
+ return legacyReplicationManager;
+ }
+
+ public boolean isContainerReplicatingOrDeleting(ContainerID containerID) {
+ return legacyReplicationManager
+ .isContainerReplicatingOrDeleting(containerID);
+ }
+}
+
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerMetrics.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerMetrics.java
index 0f828aef1a..ef17b587d7 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerMetrics.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerMetrics.java
@@ -18,7 +18,6 @@ package org.apache.hadoop.hdds.scm.container.replication;
import com.google.common.base.CaseFormat;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.scm.container.ReplicationManager;
import org.apache.hadoop.hdds.scm.container.ReplicationManagerReport;
import org.apache.hadoop.metrics2.MetricsCollector;
import org.apache.hadoop.metrics2.MetricsInfo;
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminMonitorImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminMonitorImpl.java
index a0fb6c99b2..f096f4e83f 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminMonitorImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminMonitorImpl.java
@@ -25,7 +25,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.container.ContainerReplicaCount;
-import org.apache.hadoop.hdds.scm.container.ReplicationManager;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeDecommissionManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeDecommissionManager.java
index 47d7c53469..1ea04cdfc3 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeDecommissionManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeDecommissionManager.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState;
import org.apache.hadoop.hdds.scm.DatanodeAdminError;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
-import org.apache.hadoop.hdds.scm.container.ReplicationManager;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.apache.hadoop.hdds.server.events.EventPublisher;
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/OzoneStorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/OzoneStorageContainerManager.java
index fca789d56c..68adc76a45 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/OzoneStorageContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/OzoneStorageContainerManager.java
@@ -23,7 +23,7 @@ import java.net.InetSocketAddress;
import org.apache.hadoop.hdds.scm.block.BlockManager;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
-import org.apache.hadoop.hdds.scm.container.ReplicationManager;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
import org.apache.hadoop.hdds.scm.container.balancer.ContainerBalancer;
import org.apache.hadoop.hdds.scm.ha.SCMNodeDetails;
import org.apache.hadoop.hdds.scm.node.NodeManager;
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMConfigurator.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMConfigurator.java
index d56cbf974d..d02f391585 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMConfigurator.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMConfigurator.java
@@ -27,7 +27,7 @@ import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
import org.apache.hadoop.hdds.scm.net.NetworkTopology;
import org.apache.hadoop.hdds.scm.pipeline.WritableContainerFactory;
import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
-import org.apache.hadoop.hdds.scm.container.ReplicationManager;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index caa1043f5b..9abe25969b 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -82,7 +82,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerReportHandler;
import org.apache.hadoop.hdds.scm.container.IncrementalContainerReportHandler;
-import org.apache.hadoop.hdds.scm.container.ReplicationManager;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
import org.apache.hadoop.hdds.scm.container.balancer.ContainerBalancer;
import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicyFactory;
import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementMetrics;
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
index 233bed7d45..c7e36dbe67 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
@@ -31,12 +31,14 @@ import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
-import org.apache.hadoop.hdds.scm.container.ReplicationManager
+import org.apache.hadoop.hdds.scm.container.replication.LegacyReplicationManager;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager
.ReplicationManagerConfiguration;
import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.container.common.helpers.MoveDataNodePair;
import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementStatusDefault;
-import org.apache.hadoop.hdds.scm.container.ReplicationManager.MoveResult;
+import org.apache.hadoop.hdds.scm.container.replication.LegacyReplicationManager.MoveResult;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.ha.SCMHAManagerStub;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
@@ -127,7 +129,8 @@ public class TestReplicationManager {
HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT,
0, TimeUnit.SECONDS);
- scmLogs = GenericTestUtils.LogCapturer.captureLogs(ReplicationManager.LOG);
+ scmLogs = GenericTestUtils.LogCapturer.
+ captureLogs(LegacyReplicationManager.LOG);
containerManager = Mockito.mock(ContainerManager.class);
nodeManager = new SimpleMockNodeManager();
eventQueue = new EventQueue();
@@ -1479,8 +1482,7 @@ public class TestReplicationManager {
new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
DatanodeDetails dn3 = addNode(new NodeStatus(IN_SERVICE, HEALTHY));
CompletableFuture<MoveResult> cf =
- replicationManager.move(id,
- new MoveDataNodePair(dn1.getDatanodeDetails(), dn3));
+ replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
Assert.assertTrue(scmLogs.getOutput().contains(
"receive a move request about container"));
Thread.sleep(100L);
@@ -1521,8 +1523,7 @@ public class TestReplicationManager {
addReplica(container,
new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
DatanodeDetails dn3 = addNode(new NodeStatus(IN_SERVICE, HEALTHY));
- replicationManager.move(id,
- new MoveDataNodePair(dn1.getDatanodeDetails(), dn3));
+ replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
Assert.assertTrue(scmLogs.getOutput().contains(
"receive a move request about container"));
Thread.sleep(100L);
@@ -1617,8 +1618,7 @@ public class TestReplicationManager {
new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
DatanodeDetails dn4 = addNode(new NodeStatus(IN_SERVICE, HEALTHY));
CompletableFuture<MoveResult> cf =
- replicationManager.move(id,
- new MoveDataNodePair(dn1.getDatanodeDetails(), dn4));
+ replicationManager.move(id, dn1.getDatanodeDetails(), dn4);
Assert.assertTrue(scmLogs.getOutput().contains(
"receive a move request about container"));
Thread.sleep(100L);
@@ -1659,8 +1659,7 @@ public class TestReplicationManager {
new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
DatanodeDetails dn3 = addNode(new NodeStatus(IN_SERVICE, HEALTHY));
CompletableFuture<MoveResult> cf =
- replicationManager.move(id,
- new MoveDataNodePair(dn1.getDatanodeDetails(), dn3));
+ replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
Assert.assertTrue(scmLogs.getOutput().contains(
"receive a move request about container"));
@@ -1672,8 +1671,7 @@ public class TestReplicationManager {
MoveResult.REPLICATION_FAIL_NODE_UNHEALTHY);
nodeManager.setNodeStatus(dn3, new NodeStatus(IN_SERVICE, HEALTHY));
- cf = replicationManager.move(id,
- new MoveDataNodePair(dn1.getDatanodeDetails(), dn3));
+ cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
addReplicaToDn(container, dn3, CLOSED);
replicationManager.processAll();
eventQueue.processAll(1000);
@@ -1712,30 +1710,26 @@ public class TestReplicationManager {
//we don't need a running replicationManamger now
replicationManager.stop();
Thread.sleep(100L);
- cf = replicationManager.move(id,
- new MoveDataNodePair(dn1.getDatanodeDetails(), dn3));
+ cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
Assert.assertTrue(cf.isDone() && cf.get() ==
MoveResult.FAIL_NOT_RUNNING);
replicationManager.start();
Thread.sleep(100L);
//container in not in OPEN state
- cf = replicationManager.move(id,
- new MoveDataNodePair(dn1.getDatanodeDetails(), dn3));
+ cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
Assert.assertTrue(cf.isDone() && cf.get() ==
MoveResult.REPLICATION_FAIL_CONTAINER_NOT_CLOSED);
//open -> closing
containerStateManager.updateContainerState(id.getProtobuf(),
LifeCycleEvent.FINALIZE);
- cf = replicationManager.move(id,
- new MoveDataNodePair(dn1.getDatanodeDetails(), dn3));
+ cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
Assert.assertTrue(cf.isDone() && cf.get() ==
MoveResult.REPLICATION_FAIL_CONTAINER_NOT_CLOSED);
//closing -> quasi_closed
containerStateManager.updateContainerState(id.getProtobuf(),
LifeCycleEvent.QUASI_CLOSE);
- cf = replicationManager.move(id,
- new MoveDataNodePair(dn1.getDatanodeDetails(), dn3));
+ cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
Assert.assertTrue(cf.isDone() && cf.get() ==
MoveResult.REPLICATION_FAIL_CONTAINER_NOT_CLOSED);
@@ -1750,12 +1744,10 @@ public class TestReplicationManager {
if (state != HEALTHY) {
nodeManager.setNodeStatus(dn3,
new NodeStatus(IN_SERVICE, state));
- cf = replicationManager.move(id,
- new MoveDataNodePair(dn1.getDatanodeDetails(), dn3));
+ cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
Assert.assertTrue(cf.isDone() && cf.get() ==
MoveResult.REPLICATION_FAIL_NODE_UNHEALTHY);
- cf = replicationManager.move(id,
- new MoveDataNodePair(dn3, dn1.getDatanodeDetails()));
+ cf = replicationManager.move(id, dn3, dn1.getDatanodeDetails());
Assert.assertTrue(cf.isDone() && cf.get() ==
MoveResult.REPLICATION_FAIL_NODE_UNHEALTHY);
}
@@ -1768,12 +1760,10 @@ public class TestReplicationManager {
if (state != IN_SERVICE) {
nodeManager.setNodeStatus(dn3,
new NodeStatus(state, HEALTHY));
- cf = replicationManager.move(id,
- new MoveDataNodePair(dn1.getDatanodeDetails(), dn3));
+ cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
Assert.assertTrue(cf.isDone() && cf.get() ==
MoveResult.REPLICATION_FAIL_NODE_NOT_IN_SERVICE);
- cf = replicationManager.move(id,
- new MoveDataNodePair(dn3, dn1.getDatanodeDetails()));
+ cf = replicationManager.move(id, dn3, dn1.getDatanodeDetails());
Assert.assertTrue(cf.isDone() && cf.get() ==
MoveResult.REPLICATION_FAIL_NODE_NOT_IN_SERVICE);
}
@@ -1781,14 +1771,13 @@ public class TestReplicationManager {
nodeManager.setNodeStatus(dn3, new NodeStatus(IN_SERVICE, HEALTHY));
//container exists in target datanode
- cf = replicationManager.move(id,
- new MoveDataNodePair(dn1.getDatanodeDetails(),
- dn2.getDatanodeDetails()));
+ cf = replicationManager.move(id, dn1.getDatanodeDetails(),
+ dn2.getDatanodeDetails());
Assert.assertTrue(cf.isDone() && cf.get() ==
MoveResult.REPLICATION_FAIL_EXIST_IN_TARGET);
//container does not exist in source datanode
- cf = replicationManager.move(id, new MoveDataNodePair(dn3, dn3));
+ cf = replicationManager.move(id, dn3, dn3);
Assert.assertTrue(cf.isDone() && cf.get() ==
MoveResult.REPLICATION_FAIL_NOT_EXIST_IN_SOURCE);
@@ -1799,8 +1788,7 @@ public class TestReplicationManager {
replicationManager.processAll();
//waiting for inflightDeletion generation
eventQueue.processAll(1000);
- cf = replicationManager.move(id,
- new MoveDataNodePair(dn1.getDatanodeDetails(), dn3));
+ cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
Assert.assertTrue(cf.isDone() && cf.get() ==
MoveResult.REPLICATION_FAIL_INFLIGHT_DELETION);
resetReplicationManager();
@@ -1813,8 +1801,7 @@ public class TestReplicationManager {
replicationManager.processAll();
//waiting for inflightReplication generation
eventQueue.processAll(1000);
- cf = replicationManager.move(id,
- new MoveDataNodePair(dn1.getDatanodeDetails(), dn3));
+ cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
Assert.assertTrue(cf.isDone() && cf.get() ==
MoveResult.REPLICATION_FAIL_INFLIGHT_REPLICATION);
}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancer.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancer.java
index 48069c505d..4901617f9f 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancer.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancer.java
@@ -32,7 +32,9 @@ import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
import org.apache.hadoop.hdds.scm.container.MockNodeManager;
-import org.apache.hadoop.hdds.scm.container.ReplicationManager;
+import org.apache.hadoop.hdds.scm.container.replication.LegacyReplicationManager;
+import org.apache.hadoop.hdds.scm.container.replication.LegacyReplicationManager.MoveResult;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicyFactory;
import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementMetrics;
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
@@ -134,8 +136,7 @@ public class TestContainerBalancer {
Mockito.when(replicationManager.move(Mockito.any(ContainerID.class),
Mockito.any(DatanodeDetails.class),
Mockito.any(DatanodeDetails.class)))
- .thenReturn(CompletableFuture.completedFuture(
- ReplicationManager.MoveResult.COMPLETED));
+ .thenReturn(CompletableFuture.completedFuture(MoveResult.COMPLETED));
when(containerManager.getContainerReplicas(Mockito.any(ContainerID.class)))
.thenAnswer(invocationOnMock -> {
@@ -631,7 +632,7 @@ public class TestContainerBalancer {
Mockito.any(DatanodeDetails.class),
Mockito.any(DatanodeDetails.class)))
.thenReturn(CompletableFuture.completedFuture(
- ReplicationManager.MoveResult.REPLICATION_FAIL_NODE_UNHEALTHY));
+ MoveResult.REPLICATION_FAIL_NODE_UNHEALTHY));
balancerConfiguration.setMaxSizeToMovePerIteration(10 * OzoneConsts.GB);
startBalancer(balancerConfiguration);
@@ -868,7 +869,7 @@ public class TestContainerBalancer {
}
}
- private CompletableFuture<ReplicationManager.MoveResult>
+ private CompletableFuture<LegacyReplicationManager.MoveResult>
genCompletableFuture(int sleepMilSec) {
return CompletableFuture.supplyAsync(() -> {
try {
@@ -876,7 +877,7 @@ public class TestContainerBalancer {
} catch (InterruptedException e) {
e.printStackTrace();
}
- return ReplicationManager.MoveResult.COMPLETED;
+ return LegacyReplicationManager.MoveResult.COMPLETED;
});
}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerMetrics.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerMetrics.java
index 60ebbb0b86..977194d1d5 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerMetrics.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerMetrics.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.hdds.scm.container.replication;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.scm.container.ReplicationManager;
import org.apache.hadoop.hdds.scm.container.ReplicationManagerReport;
import org.junit.After;
import org.junit.Assert;
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDatanodeAdminMonitor.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDatanodeAdminMonitor.java
index 82822a1060..ba03004e6a 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDatanodeAdminMonitor.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDatanodeAdminMonitor.java
@@ -28,7 +28,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.container.ContainerReplicaCount;
-import org.apache.hadoop.hdds.scm.container.ReplicationManager;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
import org.apache.hadoop.hdds.scm.container.SimpleMockNodeManager;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
diff --git a/hadoop-ozone/fault-injection-test/mini-chaos-tests/src/test/java/org/apache/hadoop/ozone/MiniOzoneChaosCluster.java b/hadoop-ozone/fault-injection-test/mini-chaos-tests/src/test/java/org/apache/hadoop/ozone/MiniOzoneChaosCluster.java
index b6ef8651b2..87a852b875 100644
--- a/hadoop-ozone/fault-injection-test/mini-chaos-tests/src/test/java/org/apache/hadoop/ozone/MiniOzoneChaosCluster.java
+++ b/hadoop-ozone/fault-injection-test/mini-chaos-tests/src/test/java/org/apache/hadoop/ozone/MiniOzoneChaosCluster.java
@@ -33,7 +33,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-import org.apache.hadoop.hdds.scm.container.ReplicationManager.ReplicationManagerConfiguration;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager.ReplicationManagerConfiguration;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.ozone.failure.FailureManager;
diff --git a/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/scm/ReplicaManagerInsight.java b/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/scm/ReplicaManagerInsight.java
index 6d607b702f..e42f7fc855 100644
--- a/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/scm/ReplicaManagerInsight.java
+++ b/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/scm/ReplicaManagerInsight.java
@@ -21,7 +21,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import org.apache.hadoop.hdds.scm.container.ReplicationManager;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
import org.apache.hadoop.ozone.insight.BaseInsightPoint;
import org.apache.hadoop.ozone.insight.Component.Type;
import org.apache.hadoop.ozone.insight.LoggerSource;
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeWithPipelineRules.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeWithPipelineRules.java
index 2b52564059..09f7b1909f 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeWithPipelineRules.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeWithPipelineRules.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-import org.apache.hadoop.hdds.scm.container.ReplicationManager;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
index 2149dc3271..7fc05e92b1 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
@@ -17,45 +17,7 @@
*/
package org.apache.hadoop.ozone;
-import static org.apache.hadoop.fs.CommonConfigurationKeysPublic
- .NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY;
-import static org.apache.hadoop.hdds.HddsConfigKeys
- .HDDS_COMMAND_STATUS_REPORT_INTERVAL;
-import static org.apache.hadoop.hdds.HddsConfigKeys
- .HDDS_CONTAINER_REPORT_INTERVAL;
-import static org.apache.hadoop.hdds.HddsConfigKeys
- .HDDS_SCM_SAFEMODE_PIPELINE_CREATION;
-import static org.junit.Assert.fail;
-
-import static org.mockito.Matchers.argThat;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import java.io.File;
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.Collections;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.Optional;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.stream.Stream;
-import java.util.Arrays;
-
import com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.hdds.scm.HddsTestUtils;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.commons.io.FileUtils;
@@ -66,10 +28,10 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeType;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
+import org.apache.hadoop.hdds.scm.HddsTestUtils;
import org.apache.hadoop.hdds.scm.ScmConfig;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.ScmInfo;
@@ -78,8 +40,8 @@ import org.apache.hadoop.hdds.scm.block.DeletedBlockLog;
import org.apache.hadoop.hdds.scm.block.SCMBlockDeletingService;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
-import org.apache.hadoop.hdds.scm.container.ReplicationManager;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
+import org.apache.hadoop.hdds.scm.container.replication.LegacyReplicationManager;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.ha.RatisUtil;
@@ -108,16 +70,16 @@ import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.ozone.upgrade.LayoutVersionManager;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
-import org.apache.ozone.test.GenericTestUtils;
import org.apache.hadoop.util.Time;
+import org.apache.ozone.test.GenericTestUtils;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.server.RaftServerConfigKeys;
+import org.junit.AfterClass;
+import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
-import org.junit.AfterClass;
-import org.junit.Assert;
import org.junit.rules.ExpectedException;
import org.junit.rules.Timeout;
import org.mockito.ArgumentMatcher;
@@ -125,6 +87,38 @@ import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Stream;
+
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.verify;
+
/**
* Test class that exercises the StorageContainerManager.
*/
@@ -782,8 +776,10 @@ public class TestStorageContainerManager {
cluster.restartStorageContainerManager(false);
scm = cluster.getStorageContainerManager();
EventPublisher publisher = mock(EventPublisher.class);
- ReplicationManager replicationManager = scm.getReplicationManager();
- Field f = ReplicationManager.class.getDeclaredField("eventPublisher");
+ LegacyReplicationManager replicationManager =
+ scm.getReplicationManager().getLegacyReplicationManager();
+ Field f = LegacyReplicationManager.class.
+ getDeclaredField("eventPublisher");
f.setAccessible(true);
Field modifiersField = Field.class.getDeclaredField("modifiers");
modifiersField.setAccessible(true);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerReplicationEndToEnd.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerReplicationEndToEnd.java
index bc4012c0d1..5ac3f15fd3 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerReplicationEndToEnd.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerReplicationEndToEnd.java
@@ -27,7 +27,7 @@ import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.container.ContainerID;
-import org.apache.hadoop.hdds.scm.container.ReplicationManager.ReplicationManagerConfiguration;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager.ReplicationManagerConfiguration;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.ozone.HddsDatanodeService;
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestInputStreamBase.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestInputStreamBase.java
index 07c24336aa..f3a5ecdef0 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestInputStreamBase.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestInputStreamBase.java
@@ -28,7 +28,7 @@ import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-import org.apache.hadoop.hdds.scm.container.ReplicationManager.ReplicationManagerConfiguration;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager.ReplicationManagerConfiguration;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.client.ObjectStore;
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReplication.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReplication.java
index 1eb153de6f..6d49f46092 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReplication.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReplication.java
@@ -36,7 +36,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.scm.container.ReplicationManager.ReplicationManagerConfiguration;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager.ReplicationManagerConfiguration;
import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementCapacity;
import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementRackAware;
import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementRandom;
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java
index e2f2e1631a..98b524c5f1 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java
@@ -32,7 +32,7 @@ import org.apache.hadoop.hdds.scm.block.DeletedBlockLogImpl;
import org.apache.hadoop.hdds.scm.block.SCMBlockDeletingService;
import org.apache.hadoop.hdds.scm.block.ScmBlockDeletingServiceMetrics;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
-import org.apache.hadoop.hdds.scm.container.ReplicationManager;
+import org.apache.hadoop.hdds.scm.container.replication.LegacyReplicationManager;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.hdds.utils.db.Table;
@@ -115,7 +115,7 @@ public class TestBlockDeletion {
conf = new OzoneConfiguration();
GenericTestUtils.setLogLevel(DeletedBlockLogImpl.LOG, Level.DEBUG);
GenericTestUtils.setLogLevel(SCMBlockDeletingService.LOG, Level.DEBUG);
- GenericTestUtils.setLogLevel(ReplicationManager.LOG, Level.DEBUG);
+ GenericTestUtils.setLogLevel(LegacyReplicationManager.LOG, Level.DEBUG);
String path =
GenericTestUtils.getTempPath(TestBlockDeletion.class.getSimpleName());
@@ -367,7 +367,7 @@ public class TestBlockDeletion {
Assert.assertEquals(HddsProtos.LifeCycleState.DELETING,
container.getState()));
LogCapturer logCapturer =
- LogCapturer.captureLogs(ReplicationManager.LOG);
+ LogCapturer.captureLogs(LegacyReplicationManager.LOG);
logCapturer.clearOutput();
scm.getReplicationManager().processAll();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestCloseContainer.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestCloseContainer.java
index 2e67a88347..3336408287 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestCloseContainer.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestCloseContainer.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
-import org.apache.hadoop.hdds.scm.container.ReplicationManager.ReplicationManagerConfiguration;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager.ReplicationManagerConfiguration;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.ozone.MiniOzoneCluster;
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDecommissionAndMaintenance.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDecommissionAndMaintenance.java
index 65eb4448dc..9c40c84339 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDecommissionAndMaintenance.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDecommissionAndMaintenance.java
@@ -29,7 +29,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerManager;
import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.container.ContainerReplicaCount;
-import org.apache.hadoop.hdds.scm.container.ReplicationManager.ReplicationManagerConfiguration;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager.ReplicationManagerConfiguration;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.node.NodeStatus;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
index c693bd2cbd..07d9f3fd19 100644
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
@@ -37,7 +37,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerActionsHandler;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
import org.apache.hadoop.hdds.scm.container.ContainerReportHandler;
import org.apache.hadoop.hdds.scm.container.IncrementalContainerReportHandler;
-import org.apache.hadoop.hdds.scm.container.ReplicationManager;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
import org.apache.hadoop.hdds.scm.container.balancer.ContainerBalancer;
import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicyFactory;
import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementMetrics;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org