You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by um...@apache.org on 2022/06/27 16:33:47 UTC
[ozone] branch master updated: HDDS-6699. EC: ReplicationManager - collect under and over replicated containers (#3545)
This is an automated email from the ASF dual-hosted git repository.
umamahesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 5cf5c97aee HDDS-6699. EC: ReplicationManager - collect under and over replicated containers (#3545)
5cf5c97aee is described below
commit 5cf5c97aee67ea4f88658cc02e7fb80e8fd5e60f
Author: Stephen O'Donnell <st...@gmail.com>
AuthorDate: Mon Jun 27 17:33:42 2022 +0100
HDDS-6699. EC: ReplicationManager - collect under and over replicated containers (#3545)
---
.../replication/ContainerHealthCheck.java | 5 +-
.../container/replication/ContainerReplicaOp.java | 6 +
.../replication/ECContainerHealthCheck.java | 31 +-
.../replication/LegacyReplicationManager.java | 1 -
.../container/replication/ReplicationManager.java | 86 +-
.../hdds/scm/server/StorageContainerManager.java | 10 +-
.../container/replication/ReplicationTestUtil.java | 99 +
.../replication/TestECContainerHealthCheck.java | 108 +-
...ager.java => TestLegacyReplicationManager.java} | 12 +-
.../replication/TestReplicationManager.java | 2229 ++------------------
10 files changed, 395 insertions(+), 2192 deletions(-)
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerHealthCheck.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerHealthCheck.java
index fed85b8069..5d14d04e45 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerHealthCheck.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerHealthCheck.java
@@ -16,8 +16,6 @@
*/
package org.apache.hadoop.hdds.scm.container.replication;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
@@ -32,7 +30,6 @@ public interface ContainerHealthCheck {
ContainerHealthResult checkHealth(
ContainerInfo container, Set<ContainerReplica> replicas,
- List<Pair<Integer, DatanodeDetails>> indexesPendingAdd,
- List<Pair<Integer, DatanodeDetails>> indexesPendingDelete,
+ List<ContainerReplicaOp> replicaPendingOps,
int remainingRedundancyForMaintenance);
}
\ No newline at end of file
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaOp.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaOp.java
index 869b0f0cf2..ae3e8f4530 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaOp.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaOp.java
@@ -36,6 +36,12 @@ public class ContainerReplicaOp {
private int replicaIndex;
private long scheduledEpochMillis;
+ public static ContainerReplicaOp create(PendingOpType opType,
+ DatanodeDetails target, int replicaIndex) {
+ return new ContainerReplicaOp(opType, target, replicaIndex,
+ System.currentTimeMillis());
+ }
+
public ContainerReplicaOp(PendingOpType opType,
DatanodeDetails target, int replicaIndex, long scheduledTime) {
this.opType = opType;
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECContainerHealthCheck.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECContainerHealthCheck.java
index b879867d85..68761b649d 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECContainerHealthCheck.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECContainerHealthCheck.java
@@ -16,16 +16,14 @@
*/
package org.apache.hadoop.hdds.scm.container.replication;
-import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.container.ECContainerReplicaCount;
+import java.util.ArrayList;
import java.util.List;
import java.util.Set;
-import java.util.stream.Collectors;
/**
* Class to determine the health state of an EC Container. Given the container
@@ -49,11 +47,10 @@ public class ECContainerHealthCheck implements ContainerHealthCheck {
@Override
public ContainerHealthResult checkHealth(ContainerInfo container,
Set<ContainerReplica> replicas,
- List<Pair<Integer, DatanodeDetails>> replicasPendingAdd,
- List<Pair<Integer, DatanodeDetails>> replicasPendingDelete,
+ List<ContainerReplicaOp> replicaPendingOps,
int remainingRedundancyForMaintenance) {
ECContainerReplicaCount replicaCount = getReplicaCountWithPending(container,
- replicas, replicasPendingAdd, replicasPendingDelete,
+ replicas, replicaPendingOps,
remainingRedundancyForMaintenance);
ECReplicationConfig repConfig =
@@ -91,17 +88,19 @@ public class ECContainerHealthCheck implements ContainerHealthCheck {
private ECContainerReplicaCount getReplicaCountWithPending(
ContainerInfo container, Set<ContainerReplica> replicas,
- List<Pair<Integer, DatanodeDetails>> replicasPendingAdd,
- List<Pair<Integer, DatanodeDetails>> replicasPendingDelete,
+ List<ContainerReplicaOp> replicaPendingOps,
int remainingRedundancyForMaintenance) {
- List<Integer> indexesPendingAdd = replicasPendingAdd.stream()
- .map(i -> i.getLeft()).collect(Collectors.toList());
- List<Integer> indexesPendingDelete = replicasPendingDelete.stream()
- .map(i -> i.getLeft()).collect(Collectors.toList());
-
- return new ECContainerReplicaCount(container, replicas, indexesPendingAdd,
- indexesPendingDelete, remainingRedundancyForMaintenance);
-
+ List<Integer> pendingAdd = new ArrayList<>();
+ List<Integer> pendingDelete = new ArrayList<>();
+ for (ContainerReplicaOp op : replicaPendingOps) {
+ if (op.getOpType() == ContainerReplicaOp.PendingOpType.ADD) {
+ pendingAdd.add(op.getReplicaIndex());
+ } else if (op.getOpType() == ContainerReplicaOp.PendingOpType.DELETE) {
+ pendingDelete.add(op.getReplicaIndex());
+ }
+ }
+ return new ECContainerReplicaCount(container, replicas, pendingAdd,
+ pendingDelete, remainingRedundancyForMaintenance);
}
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java
index 37d01736b9..79e11bab5a 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java
@@ -404,7 +404,6 @@ public class LegacyReplicationManager {
final Set<ContainerReplica> replicas = containerManager
.getContainerReplicas(id);
final LifeCycleState state = container.getState();
- report.increment(state);
/*
* We don't take any action if the container is in OPEN state and
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
index d0811ce2e2..b4e88cc60d 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
@@ -30,17 +30,15 @@ import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.container.ContainerReplicaCount;
import org.apache.hadoop.hdds.scm.container.ReplicationManagerReport;
-import org.apache.hadoop.hdds.scm.container.common.helpers.MoveDataNodePair;
+import org.apache.hadoop.hdds.scm.container.ReplicationManagerReport.HealthState;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
-import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
import org.apache.hadoop.hdds.scm.ha.SCMService;
-import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.apache.hadoop.hdds.server.events.EventPublisher;
-import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.util.ExitUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,8 +46,10 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.time.Clock;
import java.time.Duration;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
@@ -57,6 +57,7 @@ import java.util.concurrent.locks.ReentrantLock;
import static org.apache.hadoop.hdds.conf.ConfigTag.OZONE;
import static org.apache.hadoop.hdds.conf.ConfigTag.SCM;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.EC;
/**
* Replication Manager (RM) is the one which is responsible for making sure
@@ -125,6 +126,8 @@ public class ReplicationManager implements SCMService {
private long lastTimeToBeReadyInMillis = 0;
private final Clock clock;
private final ContainerReplicaPendingOps containerReplicaPendingOps;
+ private final ContainerHealthCheck ecContainerHealthCheck;
+ private final EventPublisher eventPublisher;
/**
* Constructs ReplicationManager instance with the given configuration.
@@ -140,11 +143,9 @@ public class ReplicationManager implements SCMService {
final PlacementPolicy containerPlacement,
final EventPublisher eventPublisher,
final SCMContext scmContext,
- final SCMServiceManager serviceManager,
final NodeManager nodeManager,
final Clock clock,
- final SCMHAManager scmhaManager,
- final Table<ContainerID, MoveDataNodePair> moveTable,
+ final LegacyReplicationManager legacyReplicationManager,
final ContainerReplicaPendingOps replicaPendingOps)
throws IOException {
this.containerManager = containerManager;
@@ -154,19 +155,14 @@ public class ReplicationManager implements SCMService {
this.clock = clock;
this.containerReport = new ReplicationManagerReport();
this.metrics = null;
+ this.eventPublisher = eventPublisher;
this.waitTimeInMillis = conf.getTimeDuration(
HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT,
HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT_DEFAULT,
TimeUnit.MILLISECONDS);
this.containerReplicaPendingOps = replicaPendingOps;
- this.legacyReplicationManager = new LegacyReplicationManager(
- conf, containerManager, containerPlacement, eventPublisher,
- scmContext, nodeManager, scmhaManager, clock, moveTable);
-
- // register ReplicationManager to SCMServiceManager.
- serviceManager.register(this);
-
- // start ReplicationManager.
+ this.legacyReplicationManager = legacyReplicationManager;
+ this.ecContainerHealthCheck = new ECContainerHealthCheck();
start();
}
@@ -233,24 +229,76 @@ public class ReplicationManager implements SCMService {
final List<ContainerInfo> containers =
containerManager.getContainers();
ReplicationManagerReport report = new ReplicationManagerReport();
+ List<ContainerHealthResult.UnderReplicatedHealthResult> underReplicated =
+ new ArrayList<>();
+ List<ContainerHealthResult.OverReplicatedHealthResult> overReplicated =
+ new ArrayList<>();
+
for (ContainerInfo c : containers) {
if (!shouldRun()) {
break;
}
- switch (c.getReplicationType()) {
- case EC:
- break;
- default:
+ report.increment(c.getState());
+ if (c.getReplicationType() != EC) {
legacyReplicationManager.processContainer(c, report);
+ continue;
+ }
+ try {
+ processContainer(c, underReplicated, overReplicated, report);
+ // TODO - send any commands contained in the health result
+ } catch (ContainerNotFoundException e) {
+ LOG.error("Container {} not found", c.getContainerID(), e);
}
}
report.setComplete();
+ // TODO - Sort the pending lists by priority and assign to the main queue,
+ // which is yet to be defined.
this.containerReport = report;
LOG.info("Replication Monitor Thread took {} milliseconds for" +
" processing {} containers.", clock.millis() - start,
containers.size());
}
+ protected ContainerHealthResult processContainer(ContainerInfo containerInfo,
+ List<ContainerHealthResult.UnderReplicatedHealthResult> underRep,
+ List<ContainerHealthResult.OverReplicatedHealthResult> overRep,
+ ReplicationManagerReport report) throws ContainerNotFoundException {
+ Set<ContainerReplica> replicas = containerManager.getContainerReplicas(
+ containerInfo.containerID());
+ List<ContainerReplicaOp> pendingOps =
+ containerReplicaPendingOps.getPendingOps(containerInfo.containerID());
+ ContainerHealthResult health = ecContainerHealthCheck
+ .checkHealth(containerInfo, replicas, pendingOps, 0);
+ // TODO - should the report have a HEALTHY state, rather than just bad
+ // states? It would need to be added to legacy RM too.
+ if (health.getHealthState()
+ == ContainerHealthResult.HealthState.UNDER_REPLICATED) {
+ report.incrementAndSample(
+ HealthState.UNDER_REPLICATED, containerInfo.containerID());
+ ContainerHealthResult.UnderReplicatedHealthResult underHealth
+ = ((ContainerHealthResult.UnderReplicatedHealthResult) health);
+ if (underHealth.isUnrecoverable()) {
+ // TODO - do we need a new health state for unrecoverable EC?
+ report.incrementAndSample(
+ HealthState.MISSING, containerInfo.containerID());
+ }
+ if (!underHealth.isSufficientlyReplicatedAfterPending() &&
+ !underHealth.isUnrecoverable()) {
+ underRep.add(underHealth);
+ }
+ } else if (health.getHealthState()
+ == ContainerHealthResult.HealthState.OVER_REPLICATED) {
+ report.incrementAndSample(HealthState.OVER_REPLICATED,
+ containerInfo.containerID());
+ ContainerHealthResult.OverReplicatedHealthResult overHealth
+ = ((ContainerHealthResult.OverReplicatedHealthResult) health);
+ if (!overHealth.isSufficientlyReplicatedAfterPending()) {
+ overRep.add(overHealth);
+ }
+ }
+ return health;
+ }
+
public ReplicationManagerReport getContainerReport() {
return containerReport;
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 7ea21a2627..3da1d74c81 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.container.ContainerManagerImpl;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaPendingOps;
+import org.apache.hadoop.hdds.scm.container.replication.LegacyReplicationManager;
import org.apache.hadoop.hdds.scm.crl.CRLStatusReportHandler;
import org.apache.hadoop.hdds.scm.ha.BackgroundSCMService;
import org.apache.hadoop.hdds.scm.ha.HASecurityUtils;
@@ -708,19 +709,22 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
if (configurator.getReplicationManager() != null) {
replicationManager = configurator.getReplicationManager();
} else {
+ LegacyReplicationManager legacyRM = new LegacyReplicationManager(
+ conf, containerManager, containerPlacementPolicy, eventQueue,
+ scmContext, scmNodeManager, scmHAManager, clock,
+ getScmMetadataStore().getMoveTable());
replicationManager = new ReplicationManager(
conf,
containerManager,
containerPlacementPolicy,
eventQueue,
scmContext,
- serviceManager,
scmNodeManager,
clock,
- scmHAManager,
- getScmMetadataStore().getMoveTable(),
+ legacyRM,
containerReplicaPendingOps);
}
+ serviceManager.register(replicationManager);
if (configurator.getScmSafeModeManager() != null) {
scmSafeModeManager = configurator.getScmSafeModeManager();
} else {
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationTestUtil.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationTestUtil.java
new file mode 100644
index 0000000000..3fe8ddddf3
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationTestUtil.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.container.replication;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE;
+
+/**
+ * Helper class to provide common methods used to test ReplicationManager.
+ */
+public final class ReplicationTestUtil {
+
+ private ReplicationTestUtil() {
+ }
+
+ public static Set<ContainerReplica> createReplicas(ContainerID containerID,
+ Pair<HddsProtos.NodeOperationalState, Integer>... nodes) {
+ Set<ContainerReplica> replicas = new HashSet<>();
+ for (Pair<HddsProtos.NodeOperationalState, Integer> p : nodes) {
+ replicas.add(
+ createContainerReplica(containerID, p.getRight(), p.getLeft()));
+ }
+ return replicas;
+ }
+
+ public static Set<ContainerReplica> createReplicas(ContainerID containerID,
+ int... indexes) {
+ Set<ContainerReplica> replicas = new HashSet<>();
+ for (int i : indexes) {
+ replicas.add(createContainerReplica(
+ containerID, i, IN_SERVICE));
+ }
+ return replicas;
+ }
+
+ public static ContainerReplica createContainerReplica(ContainerID containerID,
+ int replicaIndex, HddsProtos.NodeOperationalState opState) {
+ ContainerReplica.ContainerReplicaBuilder builder
+ = ContainerReplica.newBuilder();
+ DatanodeDetails datanodeDetails
+ = MockDatanodeDetails.randomDatanodeDetails();
+ datanodeDetails.setPersistedOpState(opState);
+ builder.setContainerID(containerID);
+ builder.setReplicaIndex(replicaIndex);
+ builder.setKeyCount(123);
+ builder.setBytesUsed(1234);
+ builder.setContainerState(StorageContainerDatanodeProtocolProtos
+ .ContainerReplicaProto.State.CLOSED);
+ builder.setDatanodeDetails(datanodeDetails);
+ builder.setSequenceId(0);
+ builder.setOriginNodeId(datanodeDetails.getUuid());
+ return builder.build();
+ }
+
+ public static ContainerInfo createContainerInfo(ReplicationConfig repConfig) {
+ return createContainerInfo(repConfig, 1, HddsProtos.LifeCycleState.CLOSED);
+ }
+
+ public static ContainerInfo createContainerInfo(
+ ReplicationConfig replicationConfig, long containerID,
+ HddsProtos.LifeCycleState containerState) {
+ ContainerInfo.Builder builder = new ContainerInfo.Builder();
+ builder.setContainerID(containerID);
+ builder.setOwner("Ozone");
+ builder.setPipelineID(PipelineID.randomId());
+ builder.setReplicationConfig(replicationConfig);
+ builder.setState(containerState);
+ return builder.build();
+ }
+
+}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECContainerHealthCheck.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECContainerHealthCheck.java
index b22999a2f8..d8060dae16 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECContainerHealthCheck.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECContainerHealthCheck.java
@@ -18,18 +18,12 @@ package org.apache.hadoop.hdds.scm.container.replication;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
-import org.apache.hadoop.hdds.client.ReplicationConfig;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
-import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult.OverReplicatedHealthResult;
import org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult.UnderReplicatedHealthResult;
import org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult.HealthState;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.junit.Assert;
import org.junit.Before;
@@ -37,7 +31,6 @@ import org.junit.Test;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -45,6 +38,10 @@ import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalSt
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONING;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_MAINTENANCE;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE;
+import static org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaOp.PendingOpType.ADD;
+import static org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaOp.PendingOpType.DELETE;
+import static org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil.createContainerInfo;
+import static org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil.createReplicas;
/**
* Tests for the ECContainerHealthCheck class.
@@ -66,7 +63,7 @@ public class TestECContainerHealthCheck {
Set<ContainerReplica> replicas
= createReplicas(container.containerID(), 1, 2, 3, 4, 5);
ContainerHealthResult result = healthCheck.checkHealth(container, replicas,
- Collections.emptyList(), Collections.emptyList(), 2);
+ Collections.emptyList(), 2);
Assert.assertEquals(HealthState.HEALTHY, result.getHealthState());
}
@@ -77,7 +74,7 @@ public class TestECContainerHealthCheck {
= createReplicas(container.containerID(), 1, 2, 4, 5);
UnderReplicatedHealthResult result = (UnderReplicatedHealthResult)
healthCheck.checkHealth(container, replicas,
- Collections.emptyList(), Collections.emptyList(), 2);
+ Collections.emptyList(), 2);
Assert.assertEquals(HealthState.UNDER_REPLICATED, result.getHealthState());
Assert.assertEquals(1, result.getRemainingRedundancy());
Assert.assertFalse(result.isSufficientlyReplicatedAfterPending());
@@ -89,11 +86,11 @@ public class TestECContainerHealthCheck {
ContainerInfo container = createContainerInfo(repConfig);
Set<ContainerReplica> replicas
= createReplicas(container.containerID(), 1, 2, 4, 5);
- List<Pair<Integer, DatanodeDetails>> pending = new ArrayList<>();
- pending.add(Pair.of(3, MockDatanodeDetails.randomDatanodeDetails()));
+ List<ContainerReplicaOp> pending = new ArrayList<>();
+ pending.add(ContainerReplicaOp.create(
+ ADD, MockDatanodeDetails.randomDatanodeDetails(), 3));
UnderReplicatedHealthResult result = (UnderReplicatedHealthResult)
- healthCheck.checkHealth(container, replicas, pending,
- Collections.emptyList(), 2);
+ healthCheck.checkHealth(container, replicas, pending, 2);
Assert.assertEquals(HealthState.UNDER_REPLICATED, result.getHealthState());
Assert.assertEquals(1, result.getRemainingRedundancy());
Assert.assertTrue(result.isSufficientlyReplicatedAfterPending());
@@ -110,7 +107,7 @@ public class TestECContainerHealthCheck {
UnderReplicatedHealthResult result = (UnderReplicatedHealthResult)
healthCheck.checkHealth(container, replicas, Collections.emptyList(),
- Collections.emptyList(), 2);
+ 2);
Assert.assertEquals(HealthState.UNDER_REPLICATED, result.getHealthState());
Assert.assertEquals(2, result.getRemainingRedundancy());
Assert.assertFalse(result.isSufficientlyReplicatedAfterPending());
@@ -124,12 +121,12 @@ public class TestECContainerHealthCheck {
Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 2),
Pair.of(IN_SERVICE, 3), Pair.of(DECOMMISSIONING, 4),
Pair.of(IN_SERVICE, 4), Pair.of(DECOMMISSIONED, 5));
- List<Pair<Integer, DatanodeDetails>> pending = new ArrayList<>();
- pending.add(Pair.of(5, MockDatanodeDetails.randomDatanodeDetails()));
+ List<ContainerReplicaOp> pending = new ArrayList<>();
+ pending.add(ContainerReplicaOp.create(
+ ADD, MockDatanodeDetails.randomDatanodeDetails(), 5));
UnderReplicatedHealthResult result = (UnderReplicatedHealthResult)
- healthCheck.checkHealth(container, replicas, pending,
- Collections.emptyList(), 2);
+ healthCheck.checkHealth(container, replicas, pending, 2);
Assert.assertEquals(HealthState.UNDER_REPLICATED, result.getHealthState());
Assert.assertEquals(2, result.getRemainingRedundancy());
Assert.assertTrue(result.isSufficientlyReplicatedAfterPending());
@@ -142,12 +139,12 @@ public class TestECContainerHealthCheck {
Set<ContainerReplica> replicas = createReplicas(container.containerID(),
Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 2),
Pair.of(DECOMMISSIONING, 4), Pair.of(DECOMMISSIONED, 5));
- List<Pair<Integer, DatanodeDetails>> pending = new ArrayList<>();
- pending.add(Pair.of(3, MockDatanodeDetails.randomDatanodeDetails()));
+ List<ContainerReplicaOp> pending = new ArrayList<>();
+ pending.add(ContainerReplicaOp.create(
+ ADD, MockDatanodeDetails.randomDatanodeDetails(), 3));
UnderReplicatedHealthResult result = (UnderReplicatedHealthResult)
- healthCheck.checkHealth(container, replicas, pending,
- Collections.emptyList(), 2);
+ healthCheck.checkHealth(container, replicas, pending, 2);
Assert.assertEquals(HealthState.UNDER_REPLICATED, result.getHealthState());
Assert.assertEquals(1, result.getRemainingRedundancy());
Assert.assertFalse(result.isSufficientlyReplicatedAfterPending());
@@ -161,7 +158,7 @@ public class TestECContainerHealthCheck {
Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 2));
UnderReplicatedHealthResult result = (UnderReplicatedHealthResult)
- healthCheck.checkHealth(container, replicas, Collections.emptyList(),
+ healthCheck.checkHealth(container, replicas,
Collections.emptyList(), 2);
Assert.assertEquals(HealthState.UNDER_REPLICATED, result.getHealthState());
Assert.assertEquals(-1, result.getRemainingRedundancy());
@@ -179,13 +176,14 @@ public class TestECContainerHealthCheck {
Pair.of(IN_SERVICE, 5),
Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 2));
- List<Pair<Integer, DatanodeDetails>> pending = new ArrayList<>();
- pending.add(Pair.of(1, MockDatanodeDetails.randomDatanodeDetails()));
- pending.add(Pair.of(2, MockDatanodeDetails.randomDatanodeDetails()));
+ List<ContainerReplicaOp> pending = new ArrayList<>();
+ pending.add(ContainerReplicaOp.create(
+ DELETE, MockDatanodeDetails.randomDatanodeDetails(), 1));
+ pending.add(ContainerReplicaOp.create(
+ DELETE, MockDatanodeDetails.randomDatanodeDetails(), 2));
OverReplicatedHealthResult result = (OverReplicatedHealthResult)
- healthCheck.checkHealth(container, replicas, Collections.emptyList(),
- pending, 2);
+ healthCheck.checkHealth(container, replicas, pending, 2);
Assert.assertEquals(HealthState.OVER_REPLICATED, result.getHealthState());
Assert.assertEquals(2, result.getExcessRedundancy());
Assert.assertTrue(result.isSufficientlyReplicatedAfterPending());
@@ -201,7 +199,7 @@ public class TestECContainerHealthCheck {
Pair.of(IN_MAINTENANCE, 1), Pair.of(IN_MAINTENANCE, 2));
ContainerHealthResult result = healthCheck.checkHealth(container, replicas,
- Collections.emptyList(), Collections.emptyList(), 2);
+ Collections.emptyList(), 2);
Assert.assertEquals(HealthState.HEALTHY, result.getHealthState());
}
@@ -214,60 +212,10 @@ public class TestECContainerHealthCheck {
Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 2));
ContainerHealthResult result = healthCheck.checkHealth(container, replicas,
- Collections.emptyList(), Collections.emptyList(), 2);
+ Collections.emptyList(), 2);
Assert.assertEquals(HealthState.UNDER_REPLICATED, result.getHealthState());
Assert.assertEquals(1,
((UnderReplicatedHealthResult)result).getRemainingRedundancy());
}
-
- private Set<ContainerReplica> createReplicas(ContainerID containerID,
- Pair<HddsProtos.NodeOperationalState, Integer>... nodes) {
- Set<ContainerReplica> replicas = new HashSet<>();
- for (Pair<HddsProtos.NodeOperationalState, Integer> p : nodes) {
- replicas.add(
- createContainerReplica(containerID, p.getRight(), p.getLeft()));
- }
- return replicas;
- }
-
- private Set<ContainerReplica> createReplicas(ContainerID containerID,
- int... indexes) {
- Set<ContainerReplica> replicas = new HashSet<>();
- for (int i : indexes) {
- replicas.add(createContainerReplica(
- containerID, i, IN_SERVICE));
- }
- return replicas;
- }
-
- private ContainerReplica createContainerReplica(ContainerID containerID,
- int replicaIndex, HddsProtos.NodeOperationalState opState) {
- ContainerReplica.ContainerReplicaBuilder builder
- = ContainerReplica.newBuilder();
- DatanodeDetails datanodeDetails
- = MockDatanodeDetails.randomDatanodeDetails();
- datanodeDetails.setPersistedOpState(opState);
- builder.setContainerID(containerID);
- builder.setReplicaIndex(replicaIndex);
- builder.setKeyCount(123);
- builder.setBytesUsed(1234);
- builder.setContainerState(StorageContainerDatanodeProtocolProtos
- .ContainerReplicaProto.State.CLOSED);
- builder.setDatanodeDetails(datanodeDetails);
- builder.setSequenceId(0);
- builder.setOriginNodeId(datanodeDetails.getUuid());
- return builder.build();
- }
-
- private ContainerInfo createContainerInfo(
- ReplicationConfig replicationConfig) {
- ContainerInfo.Builder builder = new ContainerInfo.Builder();
- builder.setContainerID(1);
- builder.setOwner("Ozone");
- builder.setPipelineID(PipelineID.randomId());
- builder.setReplicationConfig(replicationConfig);
- builder.setState(HddsProtos.LifeCycleState.CLOSED);
- return builder.build();
- }
}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestLegacyReplicationManager.java
similarity index 99%
copy from hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
copy to hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestLegacyReplicationManager.java
index 38ece4992d..66863ce553 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestLegacyReplicationManager.java
@@ -113,7 +113,7 @@ import static org.mockito.Mockito.when;
/**
* Test cases to verify the functionality of ReplicationManager.
*/
-public class TestReplicationManager {
+public class TestLegacyReplicationManager {
private ReplicationManager replicationManager;
private ContainerStateManager containerStateManager;
@@ -256,19 +256,23 @@ public class TestReplicationManager {
dbStore = DBStoreBuilder.createDBStore(
config, new SCMDBDefinition());
+ LegacyReplicationManager legacyRM = new LegacyReplicationManager(
+ config, containerManager, containerPlacementPolicy, eventQueue,
+ SCMContext.emptyContext(), nodeManager, scmHAManager, clock,
+ SCMDBDefinition.MOVE.getTable(dbStore));
+
replicationManager = new ReplicationManager(
config,
containerManager,
containerPlacementPolicy,
eventQueue,
SCMContext.emptyContext(),
- serviceManager,
nodeManager,
clock,
- scmHAManager,
- SCMDBDefinition.MOVE.getTable(dbStore),
+ legacyRM,
containerReplicaPendingOps);
+ serviceManager.register(replicationManager);
serviceManager.notifyStatusChanged();
scmLogs.clearOutput();
Thread.sleep(100L);
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
index 38ece4992d..fdce87f8ba 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
@@ -15,66 +15,29 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.hadoop.hdds.scm.container.replication;
-import com.google.common.primitives.Longs;
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
-import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler;
+import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
-import org.apache.hadoop.hdds.scm.container.ContainerStateManager;
-import org.apache.hadoop.hdds.scm.container.ContainerStateManagerImpl;
import org.apache.hadoop.hdds.scm.container.ReplicationManagerReport;
-import org.apache.hadoop.hdds.scm.container.SimpleMockNodeManager;
-import org.apache.hadoop.hdds.scm.container.TestContainerManagerImpl;
-import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager
- .ReplicationManagerConfiguration;
-import org.apache.hadoop.hdds.scm.PlacementPolicy;
-import org.apache.hadoop.hdds.scm.container.common.helpers.MoveDataNodePair;
-import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementStatusDefault;
-import org.apache.hadoop.hdds.scm.container.replication.LegacyReplicationManager.MoveResult;
-import org.apache.hadoop.hdds.scm.events.SCMEvents;
-import org.apache.hadoop.hdds.scm.ha.SCMHAManagerStub;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
-import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
-import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
-import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition;
-import org.apache.hadoop.hdds.scm.metadata.SCMDBTransactionBufferImpl;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
-import org.apache.hadoop.hdds.utils.db.DBStore;
-import org.apache.hadoop.hdds.scm.node.NodeStatus;
-import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
-import org.apache.hadoop.hdds.server.events.EventHandler;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.server.events.EventPublisher;
-import org.apache.hadoop.hdds.server.events.EventQueue;
-import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
-import org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException;
-import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
-import org.apache.ozone.test.GenericTestUtils;
import org.apache.ozone.test.TestClock;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
import org.mockito.Mockito;
-import java.io.File;
import java.io.IOException;
import java.time.Instant;
import java.time.ZoneId;
@@ -82,2076 +45,212 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-import static org.apache.hadoop.hdds.protocol.MockDatanodeDetails.createDatanodeDetails;
-import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONED;
-import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONING;
-import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_MAINTENANCE;
-import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE;
-import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.HEALTHY;
-import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.STALE;
-import static org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.CLOSED;
-import static org.apache.hadoop.hdds.scm.HddsTestUtils.CONTAINER_NUM_KEYS_DEFAULT;
-import static org.apache.hadoop.hdds.scm.HddsTestUtils.CONTAINER_USED_BYTES_DEFAULT;
-import static org.apache.hadoop.hdds.scm.HddsTestUtils.getContainer;
-import static org.apache.hadoop.hdds.scm.HddsTestUtils.getReplicas;
-import static org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails;
-import static org.mockito.Mockito.when;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT;
+import static org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil.createContainerInfo;
+import static org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil.createReplicas;
/**
- * Test cases to verify the functionality of ReplicationManager.
+ * Tests for the ReplicationManager.
*/
public class TestReplicationManager {
+ private OzoneConfiguration configuration;
private ReplicationManager replicationManager;
- private ContainerStateManager containerStateManager;
- private PlacementPolicy containerPlacementPolicy;
- private EventQueue eventQueue;
- private DatanodeCommandHandler datanodeCommandHandler;
- private SimpleMockNodeManager nodeManager;
+ private LegacyReplicationManager legacyReplicationManager;
private ContainerManager containerManager;
- private GenericTestUtils.LogCapturer scmLogs;
- private SCMServiceManager serviceManager;
+ private PlacementPolicy placementPolicy;
+ private EventPublisher eventPublisher;
+ private SCMContext scmContext;
+ private NodeManager nodeManager;
private TestClock clock;
- private File testDir;
- private DBStore dbStore;
- private PipelineManager pipelineManager;
- private SCMHAManager scmhaManager;
private ContainerReplicaPendingOps containerReplicaPendingOps;
- int getInflightCount(InflightType type) {
- return replicationManager.getLegacyReplicationManager()
- .getInflightCount(type);
- }
-
- @BeforeEach
- public void setup()
- throws IOException, InterruptedException,
- NodeNotFoundException, InvalidStateTransitionException {
- OzoneConfiguration conf = new OzoneConfiguration();
- conf.setTimeDuration(
- HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT,
- 0, TimeUnit.SECONDS);
+ private Map<ContainerID, Set<ContainerReplica>> containerReplicaMap;
+ private ReplicationConfig repConfig;
+ private ReplicationManagerReport repReport;
+ private List<ContainerHealthResult.UnderReplicatedHealthResult> underRep;
+ private List<ContainerHealthResult.OverReplicatedHealthResult> overRep;
- scmLogs = GenericTestUtils.LogCapturer.
- captureLogs(LegacyReplicationManager.LOG);
+ @Before
+ public void setup() throws IOException {
+ configuration = new OzoneConfiguration();
+ configuration.set(HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT, "0s");
containerManager = Mockito.mock(ContainerManager.class);
- nodeManager = new SimpleMockNodeManager();
- eventQueue = new EventQueue();
- scmhaManager = SCMHAManagerStub.getInstance(true);
- testDir = GenericTestUtils.getTestDir(
- TestContainerManagerImpl.class.getSimpleName() + UUID.randomUUID());
- conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
- dbStore = DBStoreBuilder.createDBStore(
- conf, new SCMDBDefinition());
- pipelineManager = Mockito.mock(PipelineManager.class);
- when(pipelineManager.containsPipeline(Mockito.any(PipelineID.class)))
- .thenReturn(true);
- containerStateManager = ContainerStateManagerImpl.newBuilder()
- .setConfiguration(conf)
- .setPipelineManager(pipelineManager)
- .setRatisServer(scmhaManager.getRatisServer())
- .setContainerStore(SCMDBDefinition.CONTAINERS.getTable(dbStore))
- .setSCMDBTransactionBuffer(scmhaManager.getDBTransactionBuffer())
- .build();
- serviceManager = new SCMServiceManager();
-
- datanodeCommandHandler = new DatanodeCommandHandler();
- eventQueue.addHandler(SCMEvents.DATANODE_COMMAND, datanodeCommandHandler);
-
- Mockito.when(containerManager.getContainers())
- .thenAnswer(invocation -> {
- Set<ContainerID> ids = containerStateManager.getContainerIDs();
- List<ContainerInfo> containers = new ArrayList<>();
- for (ContainerID id : ids) {
- containers.add(containerStateManager.getContainer(
- id));
- }
- return containers;
- });
-
- Mockito.when(containerManager.getContainer(Mockito.any(ContainerID.class)))
- .thenAnswer(invocation -> containerStateManager
- .getContainer(((ContainerID)invocation
- .getArguments()[0])));
-
- Mockito.when(containerManager.getContainerReplicas(
- Mockito.any(ContainerID.class)))
- .thenAnswer(invocation -> containerStateManager
- .getContainerReplicas(((ContainerID)invocation
- .getArguments()[0])));
-
- containerPlacementPolicy = Mockito.mock(PlacementPolicy.class);
-
- Mockito.when(containerPlacementPolicy.chooseDatanodes(
- Mockito.any(), Mockito.any(), Mockito.anyInt(),
- Mockito.anyLong(), Mockito.anyLong()))
- .thenAnswer(invocation -> {
- int count = (int) invocation.getArguments()[2];
- return IntStream.range(0, count)
- .mapToObj(i -> randomDatanodeDetails())
- .collect(Collectors.toList());
- });
-
- Mockito.when(containerPlacementPolicy.validateContainerPlacement(
- Mockito.any(),
- Mockito.anyInt()
- )).thenAnswer(invocation ->
- new ContainerPlacementStatusDefault(2, 2, 3));
- clock = new TestClock(Instant.now(), ZoneId.of("UTC"));
- containerReplicaPendingOps = new ContainerReplicaPendingOps(conf, clock);
- createReplicationManager(new ReplicationManagerConfiguration());
- }
-
- void createReplicationManager(int replicationLimit, int deletionLimit)
- throws Exception {
- replicationManager.stop();
- dbStore.close();
- final LegacyReplicationManager.ReplicationManagerConfiguration conf
- = new LegacyReplicationManager.ReplicationManagerConfiguration();
- conf.setContainerInflightReplicationLimit(replicationLimit);
- conf.setContainerInflightDeletionLimit(deletionLimit);
- createReplicationManager(conf);
- }
-
- void createReplicationManager(
- LegacyReplicationManager.ReplicationManagerConfiguration conf)
- throws Exception {
- createReplicationManager(null, conf);
- }
-
- private void createReplicationManager(ReplicationManagerConfiguration rmConf)
- throws InterruptedException, IOException {
- createReplicationManager(rmConf, null);
- }
-
- void createReplicationManager(ReplicationManagerConfiguration rmConf,
- LegacyReplicationManager.ReplicationManagerConfiguration lrmConf)
- throws InterruptedException, IOException {
- OzoneConfiguration config = new OzoneConfiguration();
- testDir = GenericTestUtils
- .getTestDir(TestContainerManagerImpl.class.getSimpleName());
- config.set(HddsConfigKeys.OZONE_METADATA_DIRS,
- testDir.getAbsolutePath());
- config.setTimeDuration(
- HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT,
- 0, TimeUnit.SECONDS);
- Optional.ofNullable(rmConf).ifPresent(config::setFromObject);
- Optional.ofNullable(lrmConf).ifPresent(config::setFromObject);
-
- SCMHAManager scmHAManager = SCMHAManagerStub
- .getInstance(true, new SCMDBTransactionBufferImpl());
- dbStore = DBStoreBuilder.createDBStore(
- config, new SCMDBDefinition());
+ placementPolicy = Mockito.mock(PlacementPolicy.class);
+ eventPublisher = Mockito.mock(EventPublisher.class);
+ scmContext = Mockito.mock(SCMContext.class);
+ nodeManager = Mockito.mock(NodeManager.class);
+ legacyReplicationManager = Mockito.mock(LegacyReplicationManager.class);
+ clock = new TestClock(Instant.now(), ZoneId.systemDefault());
+ containerReplicaPendingOps =
+ new ContainerReplicaPendingOps(configuration, clock);
+
+ Mockito.when(containerManager
+ .getContainerReplicas(Mockito.any(ContainerID.class))).thenAnswer(
+ invocation -> {
+ ContainerID cid = invocation.getArgument(0);
+ return containerReplicaMap.get(cid);
+ });
replicationManager = new ReplicationManager(
- config,
+ configuration,
containerManager,
- containerPlacementPolicy,
- eventQueue,
- SCMContext.emptyContext(),
- serviceManager,
+ placementPolicy,
+ eventPublisher,
+ scmContext,
nodeManager,
clock,
- scmHAManager,
- SCMDBDefinition.MOVE.getTable(dbStore),
+ legacyReplicationManager,
containerReplicaPendingOps);
-
- serviceManager.notifyStatusChanged();
- scmLogs.clearOutput();
- Thread.sleep(100L);
+ containerReplicaMap = new HashMap<>();
+ repConfig = new ECReplicationConfig(3, 2);
+ repReport = new ReplicationManagerReport();
+ underRep = new ArrayList<>();
+ overRep = new ArrayList<>();
}
- @AfterEach
- public void tearDown() throws Exception {
- containerStateManager.close();
- if (dbStore != null) {
- dbStore.close();
- }
-
- FileUtil.fullyDelete(testDir);
- }
-
- /**
- * Checks if restarting of replication manager works.
- */
@Test
- public void testReplicationManagerRestart() throws InterruptedException {
- Assertions.assertTrue(replicationManager.isRunning());
- replicationManager.stop();
- // Stop is a non-blocking call, it might take sometime for the
- // ReplicationManager to shutdown
- Thread.sleep(500);
- Assertions.assertFalse(replicationManager.isRunning());
- replicationManager.start();
- Assertions.assertTrue(replicationManager.isRunning());
- }
+ public void testHealthyContainer() throws ContainerNotFoundException {
+ ContainerInfo container = createContainerInfo(repConfig, 1,
+ HddsProtos.LifeCycleState.CLOSED);
+ addReplicas(container, 1, 2, 3, 4, 5);
- /**
- * Open containers are not handled by ReplicationManager.
- * This test-case makes sure that ReplicationManages doesn't take
- * any action on OPEN containers.
- */
- @Test
- public void testOpenContainer() throws IOException {
- final ContainerInfo container = getContainer(LifeCycleState.OPEN);
- containerStateManager.addContainer(container.getProtobuf());
- replicationManager.processAll();
- eventQueue.processAll(1000);
- ReplicationManagerReport report = replicationManager.getContainerReport();
- Assertions.assertEquals(1, report.getStat(LifeCycleState.OPEN));
- Assertions.assertEquals(0, datanodeCommandHandler.getInvocation());
+ ContainerHealthResult result = replicationManager.processContainer(
+ container, underRep, overRep, repReport);
+ Assert.assertEquals(ContainerHealthResult.HealthState.HEALTHY,
+ result.getHealthState());
+ Assert.assertEquals(0, underRep.size());
+ Assert.assertEquals(0, overRep.size());
}
- /**
- * If the container is in CLOSING state we resend close container command
- * to all the datanodes.
- */
@Test
- public void testClosingContainer() throws IOException {
- final ContainerInfo container = getContainer(LifeCycleState.CLOSING);
- final ContainerID id = container.containerID();
-
- containerStateManager.addContainer(container.getProtobuf());
-
- // Two replicas in CLOSING state
- final Set<ContainerReplica> replicas = getReplicas(id, State.CLOSING,
- randomDatanodeDetails(),
- randomDatanodeDetails());
-
- // One replica in OPEN state
- final DatanodeDetails datanode = randomDatanodeDetails();
- replicas.addAll(getReplicas(id, State.OPEN, datanode));
-
- for (ContainerReplica replica : replicas) {
- containerStateManager.updateContainerReplica(id, replica);
- }
-
- final int currentCloseCommandCount = datanodeCommandHandler
- .getInvocationCount(SCMCommandProto.Type.closeContainerCommand);
-
- replicationManager.processAll();
- eventQueue.processAll(1000);
- Assertions.assertEquals(currentCloseCommandCount + 3, datanodeCommandHandler
- .getInvocationCount(SCMCommandProto.Type.closeContainerCommand));
+ public void testUnderReplicatedContainer() throws ContainerNotFoundException {
+ ContainerInfo container = createContainerInfo(repConfig, 1,
+ HddsProtos.LifeCycleState.CLOSED);
+ addReplicas(container, 1, 2, 3, 4);
- // Update the OPEN to CLOSING
- for (ContainerReplica replica : getReplicas(id, State.CLOSING, datanode)) {
- containerStateManager.updateContainerReplica(id, replica);
- }
-
- replicationManager.processAll();
- eventQueue.processAll(1000);
- Assertions.assertEquals(currentCloseCommandCount + 6, datanodeCommandHandler
- .getInvocationCount(SCMCommandProto.Type.closeContainerCommand));
- ReplicationManagerReport report = replicationManager.getContainerReport();
- Assertions.assertEquals(1, report.getStat(LifeCycleState.CLOSING));
- }
-
-
- /**
- * The container is QUASI_CLOSED but two of the replica is still in
- * open state. ReplicationManager should resend close command to those
- * datanodes.
- */
- @Test
- public void testQuasiClosedContainerWithTwoOpenReplica() throws IOException {
- final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED);
- final ContainerID id = container.containerID();
- final UUID originNodeId = UUID.randomUUID();
- final ContainerReplica replicaOne = getReplicas(
- id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
- final ContainerReplica replicaTwo = getReplicas(
- id, State.OPEN, 1000L, originNodeId, randomDatanodeDetails());
- final DatanodeDetails datanodeDetails = randomDatanodeDetails();
- final ContainerReplica replicaThree = getReplicas(
- id, State.OPEN, 1000L, datanodeDetails.getUuid(), datanodeDetails);
-
- containerStateManager.addContainer(container.getProtobuf());
- containerStateManager.updateContainerReplica(id, replicaOne);
- containerStateManager.updateContainerReplica(id, replicaTwo);
- containerStateManager.updateContainerReplica(
- id, replicaThree);
-
- final int currentCloseCommandCount = datanodeCommandHandler
- .getInvocationCount(SCMCommandProto.Type.closeContainerCommand);
- // Two of the replicas are in OPEN state
- replicationManager.processAll();
- eventQueue.processAll(1000);
- Assertions.assertEquals(currentCloseCommandCount + 2, datanodeCommandHandler
- .getInvocationCount(SCMCommandProto.Type.closeContainerCommand));
- Assertions.assertTrue(datanodeCommandHandler.received(
- SCMCommandProto.Type.closeContainerCommand,
- replicaTwo.getDatanodeDetails()));
- Assertions.assertTrue(datanodeCommandHandler.received(
- SCMCommandProto.Type.closeContainerCommand,
- replicaThree.getDatanodeDetails()));
- ReplicationManagerReport report = replicationManager.getContainerReport();
- Assertions.assertEquals(1, report.getStat(LifeCycleState.QUASI_CLOSED));
- }
-
- /**
- * When the container is in QUASI_CLOSED state and all the replicas are
- * also in QUASI_CLOSED state and doesn't have a quorum to force close
- * the container, ReplicationManager will not do anything.
- */
- @Test
- public void testHealthyQuasiClosedContainer() throws IOException {
- final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED);
- final ContainerID id = container.containerID();
- final UUID originNodeId = UUID.randomUUID();
- final ContainerReplica replicaOne = getReplicas(
- id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
- final ContainerReplica replicaTwo = getReplicas(
- id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
- final ContainerReplica replicaThree = getReplicas(
- id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
-
- containerStateManager.addContainer(container.getProtobuf());
- containerStateManager.updateContainerReplica(id, replicaOne);
- containerStateManager.updateContainerReplica(id, replicaTwo);
- containerStateManager.updateContainerReplica(
- id, replicaThree);
-
- // All the QUASI_CLOSED replicas have same originNodeId, so the
- // container will not be closed. ReplicationManager should take no action.
- replicationManager.processAll();
- eventQueue.processAll(1000);
- Assertions.assertEquals(0, datanodeCommandHandler.getInvocation());
- ReplicationManagerReport report = replicationManager.getContainerReport();
- Assertions.assertEquals(1, report.getStat(LifeCycleState.QUASI_CLOSED));
- Assertions.assertEquals(1, report.getStat(
- ReplicationManagerReport.HealthState.QUASI_CLOSED_STUCK));
- }
-
- /**
- * When a container is QUASI_CLOSED and we don't have quorum to force close
- * the container, the container should have all the replicas in QUASI_CLOSED
- * state, else ReplicationManager will take action.
- *
- * In this test case we make one of the replica unhealthy, replication manager
- * will send delete container command to the datanode which has the unhealthy
- * replica.
- */
- @Test
- public void testQuasiClosedContainerWithUnhealthyReplica()
- throws IOException {
- final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED);
- container.setUsedBytes(100);
- final ContainerID id = container.containerID();
- final UUID originNodeId = UUID.randomUUID();
- final ContainerReplica replicaOne = getReplicas(
- id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
- final ContainerReplica replicaTwo = getReplicas(
- id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
- final ContainerReplica replicaThree = getReplicas(
- id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
-
- containerStateManager.addContainer(container.getProtobuf());
- containerStateManager.updateContainerReplica(id, replicaOne);
- containerStateManager.updateContainerReplica(id, replicaTwo);
- containerStateManager.updateContainerReplica(
- id, replicaThree);
-
- final int currentDeleteCommandCount = datanodeCommandHandler
- .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
- final int currentReplicateCommandCount = datanodeCommandHandler
- .getInvocationCount(SCMCommandProto.Type.replicateContainerCommand);
-
- // All the QUASI_CLOSED replicas have same originNodeId, so the
- // container will not be closed. ReplicationManager should take no action.
- replicationManager.processAll();
- eventQueue.processAll(1000);
- Assertions.assertEquals(0, datanodeCommandHandler.getInvocation());
-
- // Make the first replica unhealthy
- final ContainerReplica unhealthyReplica = getReplicas(
- id, State.UNHEALTHY, 1000L, originNodeId,
- replicaOne.getDatanodeDetails());
- containerStateManager.updateContainerReplica(
- id, unhealthyReplica);
-
- replicationManager.processAll();
- eventQueue.processAll(1000);
- Assertions.assertEquals(currentDeleteCommandCount + 1,
- datanodeCommandHandler
- .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
- Assertions.assertTrue(datanodeCommandHandler.received(
- SCMCommandProto.Type.deleteContainerCommand,
- replicaOne.getDatanodeDetails()));
- Assertions.assertEquals(currentDeleteCommandCount + 1,
- replicationManager.getMetrics().getNumDeletionCmdsSent());
-
- // Now we will delete the unhealthy replica from in-memory.
- containerStateManager.removeContainerReplica(id, replicaOne);
-
- final long currentBytesToReplicate = replicationManager.getMetrics()
- .getNumReplicationBytesTotal();
-
- // The container is under replicated as unhealthy replica is removed
- replicationManager.processAll();
- eventQueue.processAll(1000);
-
- // We should get replicate command
- Assertions.assertEquals(currentReplicateCommandCount + 1,
- datanodeCommandHandler.getInvocationCount(
- SCMCommandProto.Type.replicateContainerCommand));
- Assertions.assertEquals(currentReplicateCommandCount + 1,
- replicationManager.getMetrics().getNumReplicationCmdsSent());
- Assertions.assertEquals(currentBytesToReplicate + 100L,
- replicationManager.getMetrics().getNumReplicationBytesTotal());
- Assertions.assertEquals(1, getInflightCount(InflightType.REPLICATION));
- Assertions.assertEquals(1, replicationManager.getMetrics()
- .getInflightReplication());
-
- // We should have one under replicated and one quasi_closed_stuck
- ReplicationManagerReport report = replicationManager.getContainerReport();
- Assertions.assertEquals(1, report.getStat(LifeCycleState.QUASI_CLOSED));
- Assertions.assertEquals(1, report.getStat(
- ReplicationManagerReport.HealthState.QUASI_CLOSED_STUCK));
- Assertions.assertEquals(1, report.getStat(
- ReplicationManagerReport.HealthState.UNDER_REPLICATED));
-
- // Now we add the missing replica back
- DatanodeDetails targetDn = replicationManager.getLegacyReplicationManager()
- .getFirstDatanode(InflightType.REPLICATION, id);
- final ContainerReplica replicatedReplicaOne = getReplicas(
- id, State.CLOSED, 1000L, originNodeId, targetDn);
- containerStateManager.updateContainerReplica(
- id, replicatedReplicaOne);
-
- final long currentReplicationCommandCompleted = replicationManager
- .getMetrics().getNumReplicationCmdsCompleted();
- final long currentBytesCompleted = replicationManager.getMetrics()
- .getNumReplicationBytesCompleted();
-
- replicationManager.processAll();
- eventQueue.processAll(1000);
-
- Assertions.assertEquals(0, getInflightCount(InflightType.REPLICATION));
- Assertions.assertEquals(0, replicationManager.getMetrics()
- .getInflightReplication());
- Assertions.assertEquals(currentReplicationCommandCompleted + 1,
- replicationManager.getMetrics().getNumReplicationCmdsCompleted());
- Assertions.assertEquals(currentBytesCompleted + 100L,
- replicationManager.getMetrics().getNumReplicationBytesCompleted());
-
- report = replicationManager.getContainerReport();
- Assertions.assertEquals(1, report.getStat(LifeCycleState.QUASI_CLOSED));
- Assertions.assertEquals(1, report.getStat(
- ReplicationManagerReport.HealthState.QUASI_CLOSED_STUCK));
- Assertions.assertEquals(0, report.getStat(
+ ContainerHealthResult result = replicationManager.processContainer(
+ container, underRep, overRep, repReport);
+ Assert.assertEquals(ContainerHealthResult.HealthState.UNDER_REPLICATED,
+ result.getHealthState());
+ Assert.assertEquals(1, underRep.size());
+ Assert.assertEquals(0, overRep.size());
+ Assert.assertEquals(1, repReport.getStat(
ReplicationManagerReport.HealthState.UNDER_REPLICATED));
}
- /**
- * When a QUASI_CLOSED container is over replicated, ReplicationManager
- * deletes the excess replicas.
- */
- @Test
- public void testOverReplicatedQuasiClosedContainer() throws IOException {
- final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED);
- container.setUsedBytes(101);
- final ContainerID id = container.containerID();
- final UUID originNodeId = UUID.randomUUID();
- final ContainerReplica replicaOne = getReplicas(
- id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
- final ContainerReplica replicaTwo = getReplicas(
- id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
- final ContainerReplica replicaThree = getReplicas(
- id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
- final ContainerReplica replicaFour = getReplicas(
- id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
-
- containerStateManager.addContainer(container.getProtobuf());
- containerStateManager.updateContainerReplica(id, replicaOne);
- containerStateManager.updateContainerReplica(id, replicaTwo);
- containerStateManager.updateContainerReplica(
- id, replicaThree);
- containerStateManager.updateContainerReplica(id, replicaFour);
-
- final int currentDeleteCommandCount = datanodeCommandHandler
- .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
-
- replicationManager.processAll();
- eventQueue.processAll(1000);
- Assertions.assertEquals(currentDeleteCommandCount + 1,
- datanodeCommandHandler
- .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
- Assertions.assertEquals(currentDeleteCommandCount + 1,
- replicationManager.getMetrics().getNumDeletionCmdsSent());
- Assertions.assertEquals(1, getInflightCount(InflightType.DELETION));
- Assertions.assertEquals(1, replicationManager.getMetrics()
- .getInflightDeletion());
-
- ReplicationManagerReport report = replicationManager.getContainerReport();
- Assertions.assertEquals(1, report.getStat(LifeCycleState.QUASI_CLOSED));
- Assertions.assertEquals(1, report.getStat(
- ReplicationManagerReport.HealthState.QUASI_CLOSED_STUCK));
- Assertions.assertEquals(1, report.getStat(
- ReplicationManagerReport.HealthState.OVER_REPLICATED));
-
- // Now we remove the replica according to inflight
- DatanodeDetails targetDn = replicationManager.getLegacyReplicationManager()
- .getFirstDatanode(InflightType.DELETION, id);
- if (targetDn.equals(replicaOne.getDatanodeDetails())) {
- containerStateManager.removeContainerReplica(
- id, replicaOne);
- } else if (targetDn.equals(replicaTwo.getDatanodeDetails())) {
- containerStateManager.removeContainerReplica(
- id, replicaTwo);
- } else if (targetDn.equals(replicaThree.getDatanodeDetails())) {
- containerStateManager.removeContainerReplica(
- id, replicaThree);
- } else if (targetDn.equals(replicaFour.getDatanodeDetails())) {
- containerStateManager.removeContainerReplica(
- id, replicaFour);
- }
-
- final long currentDeleteCommandCompleted = replicationManager.getMetrics()
- .getNumDeletionCmdsCompleted();
- final long deleteBytesCompleted =
- replicationManager.getMetrics().getNumDeletionBytesCompleted();
-
- replicationManager.processAll();
- eventQueue.processAll(1000);
- Assertions.assertEquals(0, getInflightCount(InflightType.DELETION));
- Assertions.assertEquals(0, replicationManager.getMetrics()
- .getInflightDeletion());
- Assertions.assertEquals(currentDeleteCommandCompleted + 1,
- replicationManager.getMetrics().getNumDeletionCmdsCompleted());
- Assertions.assertEquals(deleteBytesCompleted + 101,
- replicationManager.getMetrics().getNumDeletionBytesCompleted());
-
- report = replicationManager.getContainerReport();
- Assertions.assertEquals(1, report.getStat(LifeCycleState.QUASI_CLOSED));
- Assertions.assertEquals(1, report.getStat(
- ReplicationManagerReport.HealthState.QUASI_CLOSED_STUCK));
- Assertions.assertEquals(0, report.getStat(
- ReplicationManagerReport.HealthState.OVER_REPLICATED));
- }
-
- /**
- * When a QUASI_CLOSED container is over replicated, ReplicationManager
- * deletes the excess replicas. While choosing the replica for deletion
- * ReplicationManager should prioritize unhealthy replica over QUASI_CLOSED
- * replica.
- */
- @Test
- public void testOverReplicatedQuasiClosedContainerWithUnhealthyReplica()
- throws IOException {
- final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED);
- final ContainerID id = container.containerID();
- final UUID originNodeId = UUID.randomUUID();
- final ContainerReplica replicaOne = getReplicas(
- id, State.UNHEALTHY, 1000L, originNodeId, randomDatanodeDetails());
- final ContainerReplica replicaTwo = getReplicas(
- id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
- final ContainerReplica replicaThree = getReplicas(
- id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
- final ContainerReplica replicaFour = getReplicas(
- id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
-
- containerStateManager.addContainer(container.getProtobuf());
- containerStateManager.updateContainerReplica(id, replicaOne);
- containerStateManager.updateContainerReplica(id, replicaTwo);
- containerStateManager.updateContainerReplica(
- id, replicaThree);
- containerStateManager.updateContainerReplica(id, replicaFour);
-
- final int currentDeleteCommandCount = datanodeCommandHandler
- .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
-
- replicationManager.processAll();
- eventQueue.processAll(1000);
- Assertions.assertEquals(currentDeleteCommandCount + 1,
- datanodeCommandHandler
- .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
- Assertions.assertTrue(datanodeCommandHandler.received(
- SCMCommandProto.Type.deleteContainerCommand,
- replicaOne.getDatanodeDetails()));
- Assertions.assertEquals(currentDeleteCommandCount + 1,
- replicationManager.getMetrics().getNumDeletionCmdsSent());
- Assertions.assertEquals(1, getInflightCount(InflightType.DELETION));
- Assertions.assertEquals(1, replicationManager.getMetrics()
- .getInflightDeletion());
-
- ReplicationManagerReport report = replicationManager.getContainerReport();
- Assertions.assertEquals(1, report.getStat(LifeCycleState.QUASI_CLOSED));
- Assertions.assertEquals(1, report.getStat(
- ReplicationManagerReport.HealthState.QUASI_CLOSED_STUCK));
- Assertions.assertEquals(1, report.getStat(
- ReplicationManagerReport.HealthState.OVER_REPLICATED));
-
- final long currentDeleteCommandCompleted = replicationManager.getMetrics()
- .getNumDeletionCmdsCompleted();
- // Now we remove the replica to simulate deletion complete
- containerStateManager.removeContainerReplica(id, replicaOne);
-
- replicationManager.processAll();
- eventQueue.processAll(1000);
-
- Assertions.assertEquals(currentDeleteCommandCompleted + 1,
- replicationManager.getMetrics().getNumDeletionCmdsCompleted());
- Assertions.assertEquals(0, getInflightCount(InflightType.DELETION));
- Assertions.assertEquals(0, replicationManager.getMetrics()
- .getInflightDeletion());
-
- report = replicationManager.getContainerReport();
- Assertions.assertEquals(1, report.getStat(LifeCycleState.QUASI_CLOSED));
- Assertions.assertEquals(1, report.getStat(
- ReplicationManagerReport.HealthState.QUASI_CLOSED_STUCK));
- Assertions.assertEquals(0, report.getStat(
- ReplicationManagerReport.HealthState.OVER_REPLICATED));
- }
-
- /**
- * ReplicationManager should replicate an QUASI_CLOSED replica if it is
- * under replicated.
- */
@Test
- public void testUnderReplicatedQuasiClosedContainer() throws IOException {
- final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED);
- container.setUsedBytes(100);
- final ContainerID id = container.containerID();
- final UUID originNodeId = UUID.randomUUID();
- final ContainerReplica replicaOne = getReplicas(
- id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
- final ContainerReplica replicaTwo = getReplicas(
- id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
-
- containerStateManager.addContainer(container.getProtobuf());
- containerStateManager.updateContainerReplica(id, replicaOne);
- containerStateManager.updateContainerReplica(id, replicaTwo);
-
- final int currentReplicateCommandCount = datanodeCommandHandler
- .getInvocationCount(SCMCommandProto.Type.replicateContainerCommand);
- final long currentBytesToReplicate = replicationManager.getMetrics()
- .getNumReplicationBytesTotal();
-
- replicationManager.processAll();
- eventQueue.processAll(1000);
- Assertions.assertEquals(currentReplicateCommandCount + 1,
- datanodeCommandHandler.getInvocationCount(
- SCMCommandProto.Type.replicateContainerCommand));
- Assertions.assertEquals(currentReplicateCommandCount + 1,
- replicationManager.getMetrics().getNumReplicationCmdsSent());
- Assertions.assertEquals(currentBytesToReplicate + 100,
- replicationManager.getMetrics().getNumReplicationBytesTotal());
- Assertions.assertEquals(1, getInflightCount(InflightType.REPLICATION));
- Assertions.assertEquals(1, replicationManager.getMetrics()
- .getInflightReplication());
-
- ReplicationManagerReport report = replicationManager.getContainerReport();
- Assertions.assertEquals(1, report.getStat(LifeCycleState.QUASI_CLOSED));
- Assertions.assertEquals(1, report.getStat(
- ReplicationManagerReport.HealthState.QUASI_CLOSED_STUCK));
- Assertions.assertEquals(1, report.getStat(
- ReplicationManagerReport.HealthState.UNDER_REPLICATED));
-
- final long currentReplicateCommandCompleted = replicationManager
- .getMetrics().getNumReplicationCmdsCompleted();
- final long currentReplicateBytesCompleted = replicationManager
- .getMetrics().getNumReplicationBytesCompleted();
-
- // Now we add the replicated new replica
- DatanodeDetails targetDn = replicationManager.getLegacyReplicationManager()
- .getFirstDatanode(InflightType.REPLICATION, id);
- final ContainerReplica replicatedReplicaThree = getReplicas(
- id, State.CLOSED, 1000L, originNodeId, targetDn);
- containerStateManager.updateContainerReplica(
- id, replicatedReplicaThree);
-
- replicationManager.processAll();
- eventQueue.processAll(1000);
-
- Assertions.assertEquals(currentReplicateCommandCompleted + 1,
- replicationManager.getMetrics().getNumReplicationCmdsCompleted());
- Assertions.assertEquals(currentReplicateBytesCompleted + 100,
- replicationManager.getMetrics().getNumReplicationBytesCompleted());
- Assertions.assertEquals(0, getInflightCount(InflightType.REPLICATION));
- Assertions.assertEquals(0, replicationManager.getMetrics()
- .getInflightReplication());
-
- report = replicationManager.getContainerReport();
- Assertions.assertEquals(1, report.getStat(LifeCycleState.QUASI_CLOSED));
- Assertions.assertEquals(1, report.getStat(
- ReplicationManagerReport.HealthState.QUASI_CLOSED_STUCK));
- Assertions.assertEquals(0, report.getStat(
+ public void testUnderReplicatedContainerFixedByPending()
+ throws ContainerNotFoundException {
+ ContainerInfo container = createContainerInfo(repConfig, 1,
+ HddsProtos.LifeCycleState.CLOSED);
+ addReplicas(container, 1, 2, 3, 4);
+ containerReplicaPendingOps.scheduleAddReplica(container.containerID(),
+ MockDatanodeDetails.randomDatanodeDetails(), 5);
+
+ ContainerHealthResult result = replicationManager.processContainer(
+ container, underRep, overRep, repReport);
+ Assert.assertEquals(ContainerHealthResult.HealthState.UNDER_REPLICATED,
+ result.getHealthState());
+ // As the pending replication fixes the under replication, nothing is added
+ // to the under replication list.
+ Assert.assertEquals(0, underRep.size());
+ Assert.assertEquals(0, overRep.size());
+ Assert.assertTrue(((ContainerHealthResult.UnderReplicatedHealthResult)
+ result).isSufficientlyReplicatedAfterPending());
+ // As the container is still under replicated, as the pending have not
+ // completed yet, the container is still marked as under-replciated in the
+ // report.
+ Assert.assertEquals(1, repReport.getStat(
ReplicationManagerReport.HealthState.UNDER_REPLICATED));
}
- /**
- * When a QUASI_CLOSED container is under replicated, ReplicationManager
- * should re-replicate it. If there are any unhealthy replica, it has to
- * be deleted.
- *
- * In this test case, the container is QUASI_CLOSED and is under replicated
- * and also has an unhealthy replica.
- *
- * In the first iteration of ReplicationManager, it should re-replicate
- * the container so that it has enough replicas.
- *
- * In the second iteration, ReplicationManager should delete the unhealthy
- * replica.
- *
- * In the third iteration, ReplicationManager will re-replicate as the
- * container has again become under replicated after the unhealthy
- * replica has been deleted.
- *
- */
@Test
- public void testUnderReplicatedQuasiClosedContainerWithUnhealthyReplica()
- throws IOException, InterruptedException,
- TimeoutException {
- final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED);
- container.setUsedBytes(99);
- final ContainerID id = container.containerID();
- final UUID originNodeId = UUID.randomUUID();
- final ContainerReplica replicaOne = getReplicas(
- id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
- final ContainerReplica replicaTwo = getReplicas(
- id, State.UNHEALTHY, 1000L, originNodeId, randomDatanodeDetails());
-
- containerStateManager.addContainer(container.getProtobuf());
- containerStateManager.updateContainerReplica(id, replicaOne);
- containerStateManager.updateContainerReplica(id, replicaTwo);
-
- final int currentReplicateCommandCount = datanodeCommandHandler
- .getInvocationCount(SCMCommandProto.Type.replicateContainerCommand);
- final int currentDeleteCommandCount = datanodeCommandHandler
- .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
- final long currentBytesToDelete = replicationManager.getMetrics()
- .getNumDeletionBytesTotal();
-
- replicationManager.processAll();
- GenericTestUtils.waitFor(
- () -> (currentReplicateCommandCount + 1) == datanodeCommandHandler
- .getInvocationCount(SCMCommandProto.Type.replicateContainerCommand),
- 50, 5000);
-
- Optional<CommandForDatanode> replicateCommand = datanodeCommandHandler
- .getReceivedCommands().stream()
- .filter(c -> c.getCommand().getType()
- .equals(SCMCommandProto.Type.replicateContainerCommand))
- .findFirst();
-
- Assertions.assertTrue(replicateCommand.isPresent());
-
- DatanodeDetails newNode = createDatanodeDetails(
- replicateCommand.get().getDatanodeId());
- ContainerReplica newReplica = getReplicas(
- id, State.QUASI_CLOSED, 1000L, originNodeId, newNode);
- containerStateManager.updateContainerReplica(id, newReplica);
-
- ReplicationManagerReport report = replicationManager.getContainerReport();
- Assertions.assertEquals(1, report.getStat(LifeCycleState.QUASI_CLOSED));
- Assertions.assertEquals(1, report.getStat(
- ReplicationManagerReport.HealthState.QUASI_CLOSED_STUCK));
- Assertions.assertEquals(1, report.getStat(
- ReplicationManagerReport.HealthState.UNDER_REPLICATED));
- Assertions.assertEquals(0, report.getStat(
- ReplicationManagerReport.HealthState.UNHEALTHY));
-
- /*
- * We have report the replica to SCM, in the next ReplicationManager
- * iteration it should delete the unhealthy replica.
- */
-
- replicationManager.processAll();
- eventQueue.processAll(1000);
- Assertions.assertEquals(currentDeleteCommandCount + 1,
- datanodeCommandHandler
- .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
- // ReplicaTwo should be deleted, that is the unhealthy one
- Assertions.assertTrue(datanodeCommandHandler.received(
- SCMCommandProto.Type.deleteContainerCommand,
- replicaTwo.getDatanodeDetails()));
- Assertions.assertEquals(currentDeleteCommandCount + 1,
- replicationManager.getMetrics().getNumDeletionCmdsSent());
- Assertions.assertEquals(currentBytesToDelete + 99,
- replicationManager.getMetrics().getNumDeletionBytesTotal());
- Assertions.assertEquals(1, getInflightCount(InflightType.DELETION));
- Assertions.assertEquals(1, replicationManager.getMetrics()
- .getInflightDeletion());
-
- containerStateManager.removeContainerReplica(id, replicaTwo);
-
- final long currentDeleteCommandCompleted = replicationManager.getMetrics()
- .getNumDeletionCmdsCompleted();
-
- report = replicationManager.getContainerReport();
- Assertions.assertEquals(1, report.getStat(LifeCycleState.QUASI_CLOSED));
- Assertions.assertEquals(1, report.getStat(
- ReplicationManagerReport.HealthState.QUASI_CLOSED_STUCK));
- Assertions.assertEquals(0, report.getStat(
- ReplicationManagerReport.HealthState.UNDER_REPLICATED));
- Assertions.assertEquals(1, report.getStat(
- ReplicationManagerReport.HealthState.UNHEALTHY));
- /*
- * We have now removed unhealthy replica, next iteration of
- * ReplicationManager should re-replicate the container as it
- * is under replicated now
- */
-
- replicationManager.processAll();
- eventQueue.processAll(1000);
-
- Assertions.assertEquals(0, getInflightCount(InflightType.DELETION));
- Assertions.assertEquals(0, replicationManager.getMetrics()
- .getInflightDeletion());
- Assertions.assertEquals(currentDeleteCommandCompleted + 1,
- replicationManager.getMetrics().getNumDeletionCmdsCompleted());
-
- Assertions.assertEquals(currentReplicateCommandCount + 2,
- datanodeCommandHandler.getInvocationCount(
- SCMCommandProto.Type.replicateContainerCommand));
- Assertions.assertEquals(currentReplicateCommandCount + 2,
- replicationManager.getMetrics().getNumReplicationCmdsSent());
- Assertions.assertEquals(1, getInflightCount(InflightType.REPLICATION));
- Assertions.assertEquals(1, replicationManager.getMetrics()
- .getInflightReplication());
-
- report = replicationManager.getContainerReport();
- Assertions.assertEquals(1, report.getStat(LifeCycleState.QUASI_CLOSED));
- Assertions.assertEquals(1, report.getStat(
- ReplicationManagerReport.HealthState.QUASI_CLOSED_STUCK));
- Assertions.assertEquals(1, report.getStat(
+ public void testUnderReplicatedAndUnrecoverable()
+ throws ContainerNotFoundException {
+ ContainerInfo container = createContainerInfo(repConfig, 1,
+ HddsProtos.LifeCycleState.CLOSED);
+ addReplicas(container, 1, 2);
+
+ ContainerHealthResult result = replicationManager.processContainer(
+ container, underRep, overRep, repReport);
+ Assert.assertEquals(ContainerHealthResult.HealthState.UNDER_REPLICATED,
+ result.getHealthState());
+ // If it is unrecoverable, there is no point in putting it into the under
+ // replication list. It will be checked again on the next RM run.
+ Assert.assertEquals(0, underRep.size());
+ Assert.assertEquals(0, overRep.size());
+ Assert.assertEquals(1, repReport.getStat(
ReplicationManagerReport.HealthState.UNDER_REPLICATED));
- Assertions.assertEquals(0, report.getStat(
- ReplicationManagerReport.HealthState.UNHEALTHY));
- }
-
-
- /**
- * When a container is QUASI_CLOSED and it has >50% of its replica
- * in QUASI_CLOSED state with unique origin node id,
- * ReplicationManager should force close the replica(s) with
- * highest BCSID.
- */
- @Test
- public void testQuasiClosedToClosed() throws IOException {
- final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED);
- final ContainerID id = container.containerID();
- final Set<ContainerReplica> replicas = getReplicas(id, State.QUASI_CLOSED,
- randomDatanodeDetails(),
- randomDatanodeDetails(),
- randomDatanodeDetails());
- containerStateManager.addContainer(container.getProtobuf());
- for (ContainerReplica replica : replicas) {
- containerStateManager.updateContainerReplica(id, replica);
- }
-
- final int currentCloseCommandCount = datanodeCommandHandler
- .getInvocationCount(SCMCommandProto.Type.closeContainerCommand);
-
- replicationManager.processAll();
- eventQueue.processAll(1000);
-
- // All the replicas have same BCSID, so all of them will be closed.
- Assertions.assertEquals(currentCloseCommandCount + 3, datanodeCommandHandler
- .getInvocationCount(SCMCommandProto.Type.closeContainerCommand));
-
- ReplicationManagerReport report = replicationManager.getContainerReport();
- Assertions.assertEquals(1, report.getStat(LifeCycleState.QUASI_CLOSED));
- Assertions.assertEquals(0, report.getStat(
- ReplicationManagerReport.HealthState.QUASI_CLOSED_STUCK));
- }
-
-
- /**
- * ReplicationManager should not take any action if the container is
- * CLOSED and healthy.
- */
- @Test
- public void testHealthyClosedContainer() throws IOException {
- final ContainerInfo container = getContainer(LifeCycleState.CLOSED);
- final ContainerID id = container.containerID();
- final Set<ContainerReplica> replicas = getReplicas(id, State.CLOSED,
- randomDatanodeDetails(),
- randomDatanodeDetails(),
- randomDatanodeDetails());
-
- containerStateManager.addContainer(container.getProtobuf());
- for (ContainerReplica replica : replicas) {
- containerStateManager.updateContainerReplica(id, replica);
- }
-
- replicationManager.processAll();
- eventQueue.processAll(1000);
- Assertions.assertEquals(0, datanodeCommandHandler.getInvocation());
-
- ReplicationManagerReport report = replicationManager.getContainerReport();
- Assertions.assertEquals(1, report.getStat(LifeCycleState.CLOSED));
- for (ReplicationManagerReport.HealthState s :
- ReplicationManagerReport.HealthState.values()) {
- Assertions.assertEquals(0, report.getStat(s));
- }
- }
-
- /**
- * ReplicationManager should close the unhealthy OPEN container.
- */
- @Test
- public void testUnhealthyOpenContainer() throws IOException {
- final ContainerInfo container = getContainer(LifeCycleState.OPEN);
- final ContainerID id = container.containerID();
- final Set<ContainerReplica> replicas = getReplicas(id, State.OPEN,
- randomDatanodeDetails(),
- randomDatanodeDetails());
- replicas.addAll(getReplicas(id, State.UNHEALTHY, randomDatanodeDetails()));
-
- containerStateManager.addContainer(container.getProtobuf());
- for (ContainerReplica replica : replicas) {
- containerStateManager.updateContainerReplica(id, replica);
- }
-
- final CloseContainerEventHandler closeContainerHandler =
- Mockito.mock(CloseContainerEventHandler.class);
- eventQueue.addHandler(SCMEvents.CLOSE_CONTAINER, closeContainerHandler);
-
- replicationManager.processAll();
- eventQueue.processAll(1000);
- Mockito.verify(closeContainerHandler, Mockito.times(1))
- .onMessage(id, eventQueue);
-
- ReplicationManagerReport report = replicationManager.getContainerReport();
- Assertions.assertEquals(1, report.getStat(LifeCycleState.OPEN));
- Assertions.assertEquals(1, report.getStat(
- ReplicationManagerReport.HealthState.OPEN_UNHEALTHY));
- }
-
- /**
- * ReplicationManager should skip send close command to unhealthy replica.
- */
- @Test
- public void testCloseUnhealthyReplica() throws IOException {
- final ContainerInfo container = getContainer(LifeCycleState.CLOSING);
- final ContainerID id = container.containerID();
- final Set<ContainerReplica> replicas = getReplicas(id, State.UNHEALTHY,
- randomDatanodeDetails());
- replicas.addAll(getReplicas(id, State.OPEN, randomDatanodeDetails()));
- replicas.addAll(getReplicas(id, State.OPEN, randomDatanodeDetails()));
-
- containerStateManager.addContainer(container.getProtobuf());
- for (ContainerReplica replica : replicas) {
- containerStateManager.updateContainerReplica(id, replica);
- }
-
- replicationManager.processAll();
- // Wait for EventQueue to call the event handler
- eventQueue.processAll(1000);
- Assertions.assertEquals(2, datanodeCommandHandler.getInvocation());
- }
-
- @Test
- public void testGeneratedConfig() {
- ReplicationManagerConfiguration rmc =
- OzoneConfiguration.newInstanceOf(ReplicationManagerConfiguration.class);
-
- //default is not included in ozone-site.xml but generated from annotation
- //to the ozone-site-generated.xml which should be loaded by the
- // OzoneConfiguration.
- Assertions.assertEquals(1800000, rmc.getEventTimeout());
-
- }
-
- @Test
- public void additionalReplicaScheduledWhenMisReplicated() throws IOException {
- final ContainerInfo container = getContainer(LifeCycleState.CLOSED);
- container.setUsedBytes(100);
- final ContainerID id = container.containerID();
- final UUID originNodeId = UUID.randomUUID();
- final ContainerReplica replicaOne = getReplicas(
- id, State.CLOSED, 1000L, originNodeId, randomDatanodeDetails());
- final ContainerReplica replicaTwo = getReplicas(
- id, State.CLOSED, 1000L, originNodeId, randomDatanodeDetails());
- final ContainerReplica replicaThree = getReplicas(
- id, State.CLOSED, 1000L, originNodeId, randomDatanodeDetails());
-
- containerStateManager.addContainer(container.getProtobuf());
- containerStateManager.updateContainerReplica(id, replicaOne);
- containerStateManager.updateContainerReplica(id, replicaTwo);
- containerStateManager.updateContainerReplica(
- id, replicaThree);
-
- // Ensure a mis-replicated status is returned for any containers in this
- // test where there are 3 replicas. When there are 2 or 4 replicas
- // the status returned will be healthy.
- Mockito.when(containerPlacementPolicy.validateContainerPlacement(
- Mockito.argThat(list -> list.size() == 3),
- Mockito.anyInt()
- )).thenAnswer(invocation -> {
- return new ContainerPlacementStatusDefault(1, 2, 3);
- });
-
- int currentReplicateCommandCount = datanodeCommandHandler
- .getInvocationCount(SCMCommandProto.Type.replicateContainerCommand);
- final long currentBytesToReplicate = replicationManager.getMetrics()
- .getNumReplicationBytesTotal();
-
- replicationManager.processAll();
- eventQueue.processAll(1000);
- // At this stage, due to the mocked calls to validateContainerPlacement
- // the policy will not be satisfied, and replication will be triggered.
-
- Assertions.assertEquals(currentReplicateCommandCount + 1,
- datanodeCommandHandler.getInvocationCount(
- SCMCommandProto.Type.replicateContainerCommand));
- Assertions.assertEquals(currentReplicateCommandCount + 1,
- replicationManager.getMetrics().getNumReplicationCmdsSent());
- Assertions.assertEquals(currentBytesToReplicate + 100,
- replicationManager.getMetrics().getNumReplicationBytesTotal());
- Assertions.assertEquals(1, getInflightCount(InflightType.REPLICATION));
- Assertions.assertEquals(1, replicationManager.getMetrics()
- .getInflightReplication());
-
- ReplicationManagerReport report = replicationManager.getContainerReport();
- Assertions.assertEquals(1, report.getStat(LifeCycleState.CLOSED));
- Assertions.assertEquals(1, report.getStat(
- ReplicationManagerReport.HealthState.MIS_REPLICATED));
-
- // Now make it so that all containers seem mis-replicated no matter how
- // many replicas. This will test replicas are not scheduled if the new
- // replica does not fix the mis-replication.
- Mockito.when(containerPlacementPolicy.validateContainerPlacement(
- Mockito.anyList(),
- Mockito.anyInt()
- )).thenAnswer(invocation -> {
- return new ContainerPlacementStatusDefault(1, 2, 3);
- });
-
- currentReplicateCommandCount = datanodeCommandHandler
- .getInvocationCount(SCMCommandProto.Type.replicateContainerCommand);
-
- replicationManager.processAll();
- eventQueue.processAll(1000);
- // At this stage, due to the mocked calls to validateContainerPlacement
- // the mis-replicated racks will not have improved, so expect to see nothing
- // scheduled.
- Assertions.assertEquals(currentReplicateCommandCount, datanodeCommandHandler
- .getInvocationCount(SCMCommandProto.Type.replicateContainerCommand));
- Assertions.assertEquals(currentReplicateCommandCount,
- replicationManager.getMetrics().getNumReplicationCmdsSent());
- Assertions.assertEquals(1, getInflightCount(InflightType.REPLICATION));
- Assertions.assertEquals(1, replicationManager.getMetrics()
- .getInflightReplication());
- }
-
- @Test
- public void overReplicatedButRemovingMakesMisReplicated() throws IOException {
- // In this test, the excess replica should not be removed.
- final ContainerInfo container = getContainer(LifeCycleState.CLOSED);
- final ContainerID id = container.containerID();
- final UUID originNodeId = UUID.randomUUID();
- final ContainerReplica replicaOne = getReplicas(
- id, State.CLOSED, 1000L, originNodeId, randomDatanodeDetails());
- final ContainerReplica replicaTwo = getReplicas(
- id, State.CLOSED, 1000L, originNodeId, randomDatanodeDetails());
- final ContainerReplica replicaThree = getReplicas(
- id, State.CLOSED, 1000L, originNodeId, randomDatanodeDetails());
- final ContainerReplica replicaFour = getReplicas(
- id, State.CLOSED, 1000L, originNodeId, randomDatanodeDetails());
- final ContainerReplica replicaFive = getReplicas(
- id, State.UNHEALTHY, 1000L, originNodeId, randomDatanodeDetails());
-
- containerStateManager.addContainer(container.getProtobuf());
- containerStateManager.updateContainerReplica(id, replicaOne);
- containerStateManager.updateContainerReplica(id, replicaTwo);
- containerStateManager.updateContainerReplica(
- id, replicaThree);
- containerStateManager.updateContainerReplica(id, replicaFour);
- containerStateManager.updateContainerReplica(id, replicaFive);
-
- // Ensure a mis-replicated status is returned for any containers in this
- // test where there are exactly 3 replicas checked.
- Mockito.when(containerPlacementPolicy.validateContainerPlacement(
- Mockito.argThat(list -> list.size() == 3),
- Mockito.anyInt()
- )).thenAnswer(
- invocation -> new ContainerPlacementStatusDefault(1, 2, 3));
-
- int currentDeleteCommandCount = datanodeCommandHandler
- .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
-
- replicationManager.processAll();
- eventQueue.processAll(1000);
- // The unhealthy replica should be removed, but not the other replica
- // as each time we test with 3 replicas, Mockito ensures it returns
- // mis-replicated
- Assertions.assertEquals(currentDeleteCommandCount + 1,
- datanodeCommandHandler
- .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
- Assertions.assertEquals(currentDeleteCommandCount + 1,
- replicationManager.getMetrics().getNumDeletionCmdsSent());
-
- Assertions.assertTrue(datanodeCommandHandler.received(
- SCMCommandProto.Type.deleteContainerCommand,
- replicaFive.getDatanodeDetails()));
- Assertions.assertEquals(1, getInflightCount(InflightType.DELETION));
- Assertions.assertEquals(1, replicationManager.getMetrics()
- .getInflightDeletion());
- assertOverReplicatedCount(1);
- }
-
- @Test
- public void testOverReplicatedAndPolicySatisfied() throws IOException {
- final ContainerInfo container = getContainer(LifeCycleState.CLOSED);
- final ContainerID id = container.containerID();
- final UUID originNodeId = UUID.randomUUID();
- final ContainerReplica replicaOne = getReplicas(
- id, State.CLOSED, 1000L, originNodeId, randomDatanodeDetails());
- final ContainerReplica replicaTwo = getReplicas(
- id, State.CLOSED, 1000L, originNodeId, randomDatanodeDetails());
- final ContainerReplica replicaThree = getReplicas(
- id, State.CLOSED, 1000L, originNodeId, randomDatanodeDetails());
- final ContainerReplica replicaFour = getReplicas(
- id, State.CLOSED, 1000L, originNodeId, randomDatanodeDetails());
-
- containerStateManager.addContainer(container.getProtobuf());
- containerStateManager.updateContainerReplica(id, replicaOne);
- containerStateManager.updateContainerReplica(id, replicaTwo);
- containerStateManager.updateContainerReplica(
- id, replicaThree);
- containerStateManager.updateContainerReplica(id, replicaFour);
-
- Mockito.when(containerPlacementPolicy.validateContainerPlacement(
- Mockito.argThat(list -> list.size() == 3),
- Mockito.anyInt()
- )).thenAnswer(
- invocation -> new ContainerPlacementStatusDefault(2, 2, 3));
-
- final int currentDeleteCommandCount = datanodeCommandHandler
- .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
-
- replicationManager.processAll();
- eventQueue.processAll(1000);
- Assertions.assertEquals(currentDeleteCommandCount + 1,
- datanodeCommandHandler
- .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
- Assertions.assertEquals(currentDeleteCommandCount + 1,
- replicationManager.getMetrics().getNumDeletionCmdsSent());
- Assertions.assertEquals(1, getInflightCount(InflightType.DELETION));
- Assertions.assertEquals(1, replicationManager.getMetrics()
- .getInflightDeletion());
-
- assertOverReplicatedCount(1);
- }
-
- @Test
- public void testOverReplicatedAndPolicyUnSatisfiedAndDeleted() throws
- IOException {
- final ContainerInfo container = getContainer(LifeCycleState.CLOSED);
- final ContainerID id = container.containerID();
- final UUID originNodeId = UUID.randomUUID();
- final ContainerReplica replicaOne = getReplicas(
- id, State.CLOSED, 1000L, originNodeId, randomDatanodeDetails());
- final ContainerReplica replicaTwo = getReplicas(
- id, State.CLOSED, 1000L, originNodeId, randomDatanodeDetails());
- final ContainerReplica replicaThree = getReplicas(
- id, State.CLOSED, 1000L, originNodeId, randomDatanodeDetails());
- final ContainerReplica replicaFour = getReplicas(
- id, State.CLOSED, 1000L, originNodeId, randomDatanodeDetails());
- final ContainerReplica replicaFive = getReplicas(
- id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
-
- containerStateManager.addContainer(container.getProtobuf());
- containerStateManager.updateContainerReplica(id, replicaOne);
- containerStateManager.updateContainerReplica(id, replicaTwo);
- containerStateManager.updateContainerReplica(
- id, replicaThree);
- containerStateManager.updateContainerReplica(id, replicaFour);
- containerStateManager.updateContainerReplica(id, replicaFive);
-
- Mockito.when(containerPlacementPolicy.validateContainerPlacement(
- Mockito.argThat(list -> list != null && list.size() <= 4),
- Mockito.anyInt()
- )).thenAnswer(
- invocation -> new ContainerPlacementStatusDefault(1, 2, 3));
-
- final int currentDeleteCommandCount = datanodeCommandHandler
- .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
-
- replicationManager.processAll();
- eventQueue.processAll(1000);
- Assertions.assertEquals(currentDeleteCommandCount + 2,
- datanodeCommandHandler
- .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
- Assertions.assertEquals(currentDeleteCommandCount + 2,
- replicationManager.getMetrics().getNumDeletionCmdsSent());
- Assertions.assertEquals(1, getInflightCount(InflightType.DELETION));
- Assertions.assertEquals(1, replicationManager.getMetrics()
- .getInflightDeletion());
- }
-
- /**
- * ReplicationManager should replicate an additional replica if there are
- * decommissioned replicas.
- */
- @Test
- public void testUnderReplicatedDueToDecommission() throws IOException {
- final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
- addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
- addReplica(container, new NodeStatus(DECOMMISSIONING, HEALTHY), CLOSED);
- addReplica(container, new NodeStatus(DECOMMISSIONING, HEALTHY), CLOSED);
- assertReplicaScheduled(2);
- assertUnderReplicatedCount(1);
- }
-
- /**
- * ReplicationManager should replicate an additional replica when all copies
- * are decommissioning.
- */
- @Test
- public void testUnderReplicatedDueToAllDecommission() throws IOException {
- runTestUnderReplicatedDueToAllDecommission(3);
- }
-
- Void runTestUnderReplicatedDueToAllDecommission(int expectedReplication)
- throws IOException {
- final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
- addReplica(container, new NodeStatus(DECOMMISSIONING, HEALTHY), CLOSED);
- addReplica(container, new NodeStatus(DECOMMISSIONING, HEALTHY), CLOSED);
- addReplica(container, new NodeStatus(DECOMMISSIONING, HEALTHY), CLOSED);
- assertReplicaScheduled(expectedReplication);
- assertUnderReplicatedCount(1);
- return null;
- }
-
- @Test
- public void testReplicationLimit() throws Exception {
- runTestLimit(1, 0, 2, 0,
- () -> runTestUnderReplicatedDueToAllDecommission(1));
- }
-
- void runTestLimit(int replicationLimit, int deletionLimit,
- int expectedReplicationSkipped, int expectedDeletionSkipped,
- Callable<Void> testcase) throws Exception {
- createReplicationManager(replicationLimit, deletionLimit);
-
- final ReplicationManagerMetrics metrics = replicationManager.getMetrics();
- final long replicationSkipped = metrics.getInflightReplicationSkipped();
- final long deletionSkipped = metrics.getInflightDeletionSkipped();
-
- testcase.call();
-
- Assertions.assertEquals(replicationSkipped + expectedReplicationSkipped,
- metrics.getInflightReplicationSkipped());
- Assertions.assertEquals(deletionSkipped + expectedDeletionSkipped,
- metrics.getInflightDeletionSkipped());
-
- //reset limits for other tests.
- createReplicationManager(0, 0);
- }
-
- /**
- * ReplicationManager should not take any action when the container is
- * correctly replicated with decommissioned replicas still present.
- */
- @Test
- public void testCorrectlyReplicatedWithDecommission() throws IOException {
- final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
- addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
- addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
- addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
- addReplica(container, new NodeStatus(DECOMMISSIONING, HEALTHY), CLOSED);
- assertReplicaScheduled(0);
- assertUnderReplicatedCount(0);
- }
-
- /**
- * ReplicationManager should replicate an additional replica when min rep
- * is not met for maintenance.
- */
- @Test
- public void testUnderReplicatedDueToMaintenance() throws IOException {
- final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
- addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
- addReplica(container, new NodeStatus(IN_MAINTENANCE, HEALTHY), CLOSED);
- addReplica(container, new NodeStatus(IN_MAINTENANCE, HEALTHY), CLOSED);
- assertReplicaScheduled(1);
- assertUnderReplicatedCount(1);
- }
-
- /**
- * ReplicationManager should not replicate an additional replica when if
- * min replica for maintenance is 1 and another replica is available.
- */
- @Test
- public void testNotUnderReplicatedDueToMaintenanceMinRepOne()
- throws Exception {
- replicationManager.stop();
- ReplicationManagerConfiguration newConf =
- new ReplicationManagerConfiguration();
- newConf.setMaintenanceReplicaMinimum(1);
- dbStore.close();
- createReplicationManager(newConf);
- final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
- addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
- addReplica(container, new NodeStatus(IN_MAINTENANCE, HEALTHY), CLOSED);
- addReplica(container, new NodeStatus(IN_MAINTENANCE, HEALTHY), CLOSED);
- assertReplicaScheduled(0);
- assertUnderReplicatedCount(0);
- }
-
- /**
- * ReplicationManager should replicate an additional replica when all copies
- * are going off line and min rep is 1.
- */
- @Test
- public void testUnderReplicatedDueToMaintenanceMinRepOne()
- throws Exception {
- replicationManager.stop();
- ReplicationManagerConfiguration newConf =
- new ReplicationManagerConfiguration();
- newConf.setMaintenanceReplicaMinimum(1);
- dbStore.close();
- createReplicationManager(newConf);
- final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
- addReplica(container, new NodeStatus(IN_MAINTENANCE, HEALTHY), CLOSED);
- addReplica(container, new NodeStatus(IN_MAINTENANCE, HEALTHY), CLOSED);
- addReplica(container, new NodeStatus(IN_MAINTENANCE, HEALTHY), CLOSED);
- assertReplicaScheduled(1);
- assertUnderReplicatedCount(1);
- }
-
- /**
- * ReplicationManager should replicate additional replica when all copies
- * are going into maintenance.
- */
- @Test
- public void testUnderReplicatedDueToAllMaintenance() throws IOException {
- final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
- addReplica(container, new NodeStatus(IN_MAINTENANCE, HEALTHY), CLOSED);
- addReplica(container, new NodeStatus(IN_MAINTENANCE, HEALTHY), CLOSED);
- addReplica(container, new NodeStatus(IN_MAINTENANCE, HEALTHY), CLOSED);
- assertReplicaScheduled(2);
- assertUnderReplicatedCount(1);
- }
-
- /**
- * ReplicationManager should not replicate additional replica sufficient
- * replica are available.
- */
- @Test
- public void testCorrectlyReplicatedWithMaintenance() throws IOException {
- final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
- addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
- addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
- addReplica(container, new NodeStatus(IN_MAINTENANCE, HEALTHY), CLOSED);
- addReplica(container, new NodeStatus(IN_MAINTENANCE, HEALTHY), CLOSED);
- assertReplicaScheduled(0);
- assertUnderReplicatedCount(0);
- }
-
- /**
- * ReplicationManager should replicate additional replica when all copies
- * are decommissioning or maintenance.
- */
- @Test
- public void testUnderReplicatedWithDecommissionAndMaintenance()
- throws IOException {
- final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
- addReplica(container, new NodeStatus(DECOMMISSIONED, HEALTHY), CLOSED);
- addReplica(container, new NodeStatus(DECOMMISSIONED, HEALTHY), CLOSED);
- addReplica(container, new NodeStatus(IN_MAINTENANCE, HEALTHY), CLOSED);
- addReplica(container, new NodeStatus(IN_MAINTENANCE, HEALTHY), CLOSED);
- assertReplicaScheduled(2);
- assertUnderReplicatedCount(1);
- }
-
- /**
- * ReplicationManager should replicate zero replica when all copies
- * are missing.
- */
- @Test
- public void testContainerWithMissingReplicas()
- throws IOException {
- createContainer(LifeCycleState.CLOSED);
- assertReplicaScheduled(0);
- assertUnderReplicatedCount(1);
- assertMissingCount(1);
- }
- /**
- * When a CLOSED container is over replicated, ReplicationManager
- * deletes the excess replicas. While choosing the replica for deletion
- * ReplicationManager should not attempt to remove a DECOMMISSION or
- * MAINTENANCE replica.
- */
- @Test
- public void testOverReplicatedClosedContainerWithDecomAndMaint()
- throws IOException {
- final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
- addReplica(container, NodeStatus.inServiceHealthy(), CLOSED);
- addReplica(container, new NodeStatus(DECOMMISSIONED, HEALTHY), CLOSED);
- addReplica(container, new NodeStatus(IN_MAINTENANCE, HEALTHY), CLOSED);
- addReplica(container, NodeStatus.inServiceHealthy(), CLOSED);
- addReplica(container, NodeStatus.inServiceHealthy(), CLOSED);
- addReplica(container, NodeStatus.inServiceHealthy(), CLOSED);
- addReplica(container, NodeStatus.inServiceHealthy(), CLOSED);
-
- final int currentDeleteCommandCount = datanodeCommandHandler
- .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
-
- replicationManager.processAll();
- eventQueue.processAll(1000);
- Assertions.assertEquals(currentDeleteCommandCount + 2,
- datanodeCommandHandler
- .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
- Assertions.assertEquals(currentDeleteCommandCount + 2,
- replicationManager.getMetrics().getNumDeletionCmdsSent());
- Assertions.assertEquals(1, getInflightCount(InflightType.DELETION));
- Assertions.assertEquals(1, replicationManager.getMetrics()
- .getInflightDeletion());
- // Get the DECOM and Maint replica and ensure none of them are scheduled
- // for removal
- Set<ContainerReplica> decom =
- containerStateManager.getContainerReplicas(
- container.containerID())
- .stream()
- .filter(r -> r.getDatanodeDetails().getPersistedOpState() != IN_SERVICE)
- .collect(Collectors.toSet());
- for (ContainerReplica r : decom) {
- Assertions.assertFalse(datanodeCommandHandler.received(
- SCMCommandProto.Type.deleteContainerCommand,
- r.getDatanodeDetails()));
- }
- assertOverReplicatedCount(1);
- }
-
- /**
- * Replication Manager should not attempt to replicate from an unhealthy
- * (stale or dead) node. To test this, setup a scenario where a replia needs
- * to be created, but mark all nodes stale. That way, no new replica will be
- * scheduled.
- */
- @Test
- public void testUnderReplicatedNotHealthySource() throws IOException {
- final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
- addReplica(container, NodeStatus.inServiceStale(), CLOSED);
- addReplica(container, new NodeStatus(DECOMMISSIONED, STALE), CLOSED);
- addReplica(container, new NodeStatus(DECOMMISSIONED, STALE), CLOSED);
- // There should be replica scheduled, but as all nodes are stale, nothing
- // gets scheduled.
- assertReplicaScheduled(0);
- assertUnderReplicatedCount(1);
- }
-
- /**
- * if all the prerequisites are satisfied, move should work as expected.
- */
- @Test
- public void testMove() throws IOException, NodeNotFoundException,
- InterruptedException, ExecutionException {
- final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
- ContainerID id = container.containerID();
- ContainerReplica dn1 = addReplica(container,
- new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
- addReplica(container,
- new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
- addReplica(container,
- new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
- DatanodeDetails dn3 = addNode(new NodeStatus(IN_SERVICE, HEALTHY));
- CompletableFuture<MoveResult> cf =
- replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
- Assertions.assertTrue(scmLogs.getOutput().contains(
- "receive a move request about container"));
- Thread.sleep(100L);
- Assertions.assertTrue(datanodeCommandHandler.received(
- SCMCommandProto.Type.replicateContainerCommand, dn3));
- Assertions.assertEquals(1, datanodeCommandHandler.getInvocationCount(
- SCMCommandProto.Type.replicateContainerCommand));
-
- //replicate container to dn3
- addReplicaToDn(container, dn3, CLOSED);
- replicationManager.processAll();
- eventQueue.processAll(1000);
-
- Assertions.assertTrue(datanodeCommandHandler.received(
- SCMCommandProto.Type.deleteContainerCommand, dn1.getDatanodeDetails()));
- Assertions.assertEquals(1, datanodeCommandHandler.getInvocationCount(
- SCMCommandProto.Type.deleteContainerCommand));
- containerStateManager.removeContainerReplica(id, dn1);
-
- replicationManager.processAll();
- eventQueue.processAll(1000);
-
- Assertions.assertTrue(cf.isDone() && cf.get() == MoveResult.COMPLETED);
- }
-
- /**
- * if crash happened and restarted, move option should work as expected.
- */
- @Test
- public void testMoveCrashAndRestart() throws IOException,
- NodeNotFoundException, InterruptedException {
- final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
- ContainerID id = container.containerID();
- ContainerReplica dn1 = addReplica(container,
- new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
- addReplica(container,
- new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
- addReplica(container,
- new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
- DatanodeDetails dn3 = addNode(new NodeStatus(IN_SERVICE, HEALTHY));
- replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
- Assertions.assertTrue(scmLogs.getOutput().contains(
- "receive a move request about container"));
- Thread.sleep(100L);
- Assertions.assertTrue(datanodeCommandHandler.received(
- SCMCommandProto.Type.replicateContainerCommand, dn3));
- Assertions.assertEquals(1, datanodeCommandHandler.getInvocationCount(
- SCMCommandProto.Type.replicateContainerCommand));
-
- //crash happens, restart scm.
- //clear current inflight actions and reload inflightMove from DBStore.
- resetReplicationManager();
- replicationManager.getMoveScheduler()
- .reinitialize(SCMDBDefinition.MOVE.getTable(dbStore));
- Assertions.assertTrue(replicationManager.getMoveScheduler()
- .getInflightMove().containsKey(id));
- MoveDataNodePair kv = replicationManager.getMoveScheduler()
- .getInflightMove().get(id);
- Assertions.assertEquals(kv.getSrc(), dn1.getDatanodeDetails());
- Assertions.assertEquals(kv.getTgt(), dn3);
- serviceManager.notifyStatusChanged();
-
- Thread.sleep(100L);
- // now, the container is not over-replicated,
- // so no deleteContainerCommand will be sent
- Assertions.assertFalse(datanodeCommandHandler.received(
- SCMCommandProto.Type.deleteContainerCommand, dn1.getDatanodeDetails()));
- //replica does not exist in target datanode, so a replicateContainerCommand
- //will be sent again at notifyStatusChanged#onLeaderReadyAndOutOfSafeMode
- Assertions.assertEquals(2, datanodeCommandHandler.getInvocationCount(
- SCMCommandProto.Type.replicateContainerCommand));
-
-
- //replicate container to dn3, now, over-replicated
- addReplicaToDn(container, dn3, CLOSED);
- replicationManager.processAll();
- eventQueue.processAll(1000);
-
- //deleteContainerCommand is sent, but the src replica is not deleted now
- Assertions.assertEquals(1, datanodeCommandHandler.getInvocationCount(
- SCMCommandProto.Type.deleteContainerCommand));
-
- //crash happens, restart scm.
- //clear current inflight actions and reload inflightMove from DBStore.
- resetReplicationManager();
- replicationManager.getMoveScheduler()
- .reinitialize(SCMDBDefinition.MOVE.getTable(dbStore));
- Assertions.assertTrue(replicationManager.getMoveScheduler()
- .getInflightMove().containsKey(id));
- kv = replicationManager.getMoveScheduler()
- .getInflightMove().get(id);
- Assertions.assertEquals(kv.getSrc(), dn1.getDatanodeDetails());
- Assertions.assertEquals(kv.getTgt(), dn3);
- serviceManager.notifyStatusChanged();
-
- //after restart and the container is over-replicated now,
- //deleteContainerCommand will be sent again
- Assertions.assertEquals(2, datanodeCommandHandler.getInvocationCount(
- SCMCommandProto.Type.deleteContainerCommand));
- containerStateManager.removeContainerReplica(id, dn1);
-
- //replica in src datanode is deleted now
- containerStateManager.removeContainerReplica(id, dn1);
- replicationManager.processAll();
- eventQueue.processAll(1000);
-
- //since the move is complete,so after scm crash and restart
- //inflightMove should not contain the container again
- resetReplicationManager();
- replicationManager.getMoveScheduler()
- .reinitialize(SCMDBDefinition.MOVE.getTable(dbStore));
- Assertions.assertFalse(replicationManager.getMoveScheduler()
- .getInflightMove().containsKey(id));
-
- //completeableFuture is not stored in DB, so after scm crash and
- //restart ,completeableFuture is missing
- }
-
- /**
- * make sure RM does not delete replica if placement policy is not satisfied.
- */
- @Test
- public void testMoveNotDeleteSrcIfPolicyNotSatisfied()
- throws IOException, NodeNotFoundException,
- InterruptedException, ExecutionException {
- final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
- ContainerID id = container.containerID();
- ContainerReplica dn1 = addReplica(container,
- new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
- ContainerReplica dn2 = addReplica(container,
- new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
- addReplica(container,
- new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
- DatanodeDetails dn4 = addNode(new NodeStatus(IN_SERVICE, HEALTHY));
- CompletableFuture<MoveResult> cf =
- replicationManager.move(id, dn1.getDatanodeDetails(), dn4);
- Assertions.assertTrue(scmLogs.getOutput().contains(
- "receive a move request about container"));
- Thread.sleep(100L);
- Assertions.assertTrue(datanodeCommandHandler.received(
- SCMCommandProto.Type.replicateContainerCommand, dn4));
- Assertions.assertEquals(1, datanodeCommandHandler.getInvocationCount(
- SCMCommandProto.Type.replicateContainerCommand));
-
- //replicate container to dn4
- addReplicaToDn(container, dn4, CLOSED);
- //now, replication succeeds, but replica in dn2 lost,
- //and there are only tree replicas totally, so rm should
- //not delete the replica on dn1
- containerStateManager.removeContainerReplica(id, dn2);
- replicationManager.processAll();
- eventQueue.processAll(1000);
-
- Assertions.assertFalse(datanodeCommandHandler.received(
- SCMCommandProto.Type.deleteContainerCommand, dn1.getDatanodeDetails()));
-
- Assertions.assertTrue(cf.isDone() &&
- cf.get() == MoveResult.DELETE_FAIL_POLICY);
- }
-
-
- /**
- * test src and target datanode become unhealthy when moving.
- */
- @Test
- public void testDnBecameUnhealthyWhenMoving() throws IOException,
- NodeNotFoundException, InterruptedException, ExecutionException {
- final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
- ContainerID id = container.containerID();
- ContainerReplica dn1 = addReplica(container,
- new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
- addReplica(container,
- new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
- addReplica(container,
- new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
- DatanodeDetails dn3 = addNode(new NodeStatus(IN_SERVICE, HEALTHY));
- CompletableFuture<MoveResult> cf =
- replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
- Assertions.assertTrue(scmLogs.getOutput().contains(
- "receive a move request about container"));
-
- nodeManager.setNodeStatus(dn3, new NodeStatus(IN_SERVICE, STALE));
- replicationManager.processAll();
- eventQueue.processAll(1000);
-
- Assertions.assertTrue(cf.isDone() && cf.get() ==
- MoveResult.REPLICATION_FAIL_NODE_UNHEALTHY);
-
- nodeManager.setNodeStatus(dn3, new NodeStatus(IN_SERVICE, HEALTHY));
- cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
- addReplicaToDn(container, dn3, CLOSED);
- replicationManager.processAll();
- eventQueue.processAll(1000);
- nodeManager.setNodeStatus(dn1.getDatanodeDetails(),
- new NodeStatus(IN_SERVICE, STALE));
- replicationManager.processAll();
- eventQueue.processAll(1000);
-
- Assertions.assertTrue(cf.isDone() && cf.get() ==
- MoveResult.DELETION_FAIL_NODE_UNHEALTHY);
- }
-
- /**
- * before Replication Manager generates a completablefuture for a move option,
- * some Prerequisites should be satisfied.
- */
- @Test
- public void testMovePrerequisites() throws IOException, NodeNotFoundException,
- InterruptedException, ExecutionException,
- InvalidStateTransitionException {
- //all conditions is met
- final ContainerInfo container = createContainer(LifeCycleState.OPEN);
- ContainerID id = container.containerID();
- ContainerReplica dn1 = addReplica(container,
- new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
- ContainerReplica dn2 = addReplica(container,
- new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
- DatanodeDetails dn3 = addNode(new NodeStatus(IN_SERVICE, HEALTHY));
- ContainerReplica dn4 = addReplica(container,
- new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
-
- CompletableFuture<MoveResult> cf;
- //the above move is executed successfully, so there may be some item in
- //inflightReplication or inflightDeletion. here we stop replication manager
- //to clear these states, which may impact the tests below.
- //we don't need a running replicationManamger now
- replicationManager.stop();
- Thread.sleep(100L);
- cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
- Assertions.assertTrue(cf.isDone() && cf.get() ==
- MoveResult.FAIL_NOT_RUNNING);
- replicationManager.start();
- Thread.sleep(100L);
-
- //container in not in OPEN state
- cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
- Assertions.assertTrue(cf.isDone() && cf.get() ==
- MoveResult.REPLICATION_FAIL_CONTAINER_NOT_CLOSED);
- //open -> closing
- containerStateManager.updateContainerState(id.getProtobuf(),
- LifeCycleEvent.FINALIZE);
- cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
- Assertions.assertTrue(cf.isDone() && cf.get() ==
- MoveResult.REPLICATION_FAIL_CONTAINER_NOT_CLOSED);
- //closing -> quasi_closed
- containerStateManager.updateContainerState(id.getProtobuf(),
- LifeCycleEvent.QUASI_CLOSE);
- cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
- Assertions.assertTrue(cf.isDone() && cf.get() ==
- MoveResult.REPLICATION_FAIL_CONTAINER_NOT_CLOSED);
-
- //quasi_closed -> closed
- containerStateManager.updateContainerState(id.getProtobuf(),
- LifeCycleEvent.FORCE_CLOSE);
- Assertions.assertSame(LifeCycleState.CLOSED,
- containerStateManager.getContainer(id).getState());
-
- //Node is not in healthy state
- for (HddsProtos.NodeState state : HddsProtos.NodeState.values()) {
- if (state != HEALTHY) {
- nodeManager.setNodeStatus(dn3,
- new NodeStatus(IN_SERVICE, state));
- cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
- Assertions.assertTrue(cf.isDone() && cf.get() ==
- MoveResult.REPLICATION_FAIL_NODE_UNHEALTHY);
- cf = replicationManager.move(id, dn3, dn1.getDatanodeDetails());
- Assertions.assertTrue(cf.isDone() && cf.get() ==
- MoveResult.REPLICATION_FAIL_NODE_UNHEALTHY);
- }
- }
- nodeManager.setNodeStatus(dn3, new NodeStatus(IN_SERVICE, HEALTHY));
-
- //Node is not in IN_SERVICE state
- for (HddsProtos.NodeOperationalState state :
- HddsProtos.NodeOperationalState.values()) {
- if (state != IN_SERVICE) {
- nodeManager.setNodeStatus(dn3,
- new NodeStatus(state, HEALTHY));
- cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
- Assertions.assertTrue(cf.isDone() && cf.get() ==
- MoveResult.REPLICATION_FAIL_NODE_NOT_IN_SERVICE);
- cf = replicationManager.move(id, dn3, dn1.getDatanodeDetails());
- Assertions.assertTrue(cf.isDone() && cf.get() ==
- MoveResult.REPLICATION_FAIL_NODE_NOT_IN_SERVICE);
- }
- }
- nodeManager.setNodeStatus(dn3, new NodeStatus(IN_SERVICE, HEALTHY));
-
- //container exists in target datanode
- cf = replicationManager.move(id, dn1.getDatanodeDetails(),
- dn2.getDatanodeDetails());
- Assertions.assertTrue(cf.isDone() && cf.get() ==
- MoveResult.REPLICATION_FAIL_EXIST_IN_TARGET);
-
- //container does not exist in source datanode
- cf = replicationManager.move(id, dn3, dn3);
- Assertions.assertTrue(cf.isDone() && cf.get() ==
- MoveResult.REPLICATION_FAIL_NOT_EXIST_IN_SOURCE);
-
- //make container over relplicated to test the
- // case that container is in inflightDeletion
- ContainerReplica dn5 = addReplica(container,
- new NodeStatus(IN_SERVICE, HEALTHY), State.CLOSED);
- replicationManager.processAll();
- //waiting for inflightDeletion generation
- eventQueue.processAll(1000);
- cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
- Assertions.assertTrue(cf.isDone() && cf.get() ==
- MoveResult.REPLICATION_FAIL_INFLIGHT_DELETION);
- resetReplicationManager();
-
- //make the replica num be 2 to test the case
- //that container is in inflightReplication
- containerStateManager.removeContainerReplica(id, dn5);
- containerStateManager.removeContainerReplica(id, dn4);
- //replication manager should generate inflightReplication
- replicationManager.processAll();
- //waiting for inflightReplication generation
- eventQueue.processAll(1000);
- cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
- Assertions.assertTrue(cf.isDone() && cf.get() ==
- MoveResult.REPLICATION_FAIL_INFLIGHT_REPLICATION);
- }
-
- @Test
- public void testReplicateCommandTimeout() throws IOException {
- long timeout = new ReplicationManagerConfiguration().getEventTimeout();
-
- final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
- addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
- addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
- assertReplicaScheduled(1);
-
- // Already a pending replica, so nothing scheduled
- assertReplicaScheduled(0);
-
- // Advance the clock past the timeout, and there should be a replica
- // scheduled
- clock.fastForward(timeout + 1000);
- assertReplicaScheduled(1);
- Assertions.assertEquals(1, replicationManager.getMetrics()
- .getNumReplicationCmdsTimeout());
- }
-
- @Test
- public void testDeleteCommandTimeout() throws
- IOException, InterruptedException {
- long timeout = new ReplicationManagerConfiguration().getEventTimeout();
-
- final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
- addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
- addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
- addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
- addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
- assertDeleteScheduled(1);
-
- // Already a pending replica, so nothing scheduled
- assertReplicaScheduled(0);
-
- // Advance the clock past the timeout, and there should be a replica
- // scheduled
- clock.fastForward(timeout + 1000);
- assertDeleteScheduled(1);
- Assertions.assertEquals(1, replicationManager.getMetrics()
- .getNumDeletionCmdsTimeout());
+ Assert.assertEquals(1, repReport.getStat(
+ ReplicationManagerReport.HealthState.MISSING));
}
- /**
- * A closed empty container with all the replicas also closed and empty
- * should be deleted.
- * A container/ replica should be deemed empty when it has 0 keyCount even
- * if the usedBytes is not 0 (usedBytes should not be used to determine if
- * the container or replica is empty).
- */
@Test
- public void testDeleteEmptyContainer() throws Exception {
- runTestDeleteEmptyContainer(3);
- }
-
- Void runTestDeleteEmptyContainer(int expectedDelete) throws Exception {
- // Create container with usedBytes = 1000 and keyCount = 0
- final ContainerInfo container = createContainer(LifeCycleState.CLOSED, 1000,
- 0);
- addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
- addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
- // Create a replica with usedBytes != 0 and keyCount = 0
- addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED, 100, 0);
-
- assertDeleteScheduled(expectedDelete);
- return null;
+ public void testUnderAndOverReplicated()
+ throws ContainerNotFoundException {
+ ContainerInfo container = createContainerInfo(repConfig, 1,
+ HddsProtos.LifeCycleState.CLOSED);
+ addReplicas(container, 1, 2, 3, 5, 5);
+
+ ContainerHealthResult result = replicationManager.processContainer(
+ container, underRep, overRep, repReport);
+ // If it is both under and over replicated, we set it to the most important
+ // state, which is under-replicated. When that is fixed, over replication
+ // will be handled.
+ Assert.assertEquals(ContainerHealthResult.HealthState.UNDER_REPLICATED,
+ result.getHealthState());
+ Assert.assertEquals(1, underRep.size());
+ Assert.assertEquals(0, overRep.size());
+ Assert.assertEquals(1, repReport.getStat(
+ ReplicationManagerReport.HealthState.UNDER_REPLICATED));
+ Assert.assertEquals(0, repReport.getStat(
+ ReplicationManagerReport.HealthState.OVER_REPLICATED));
}
@Test
- public void testDeletionLimit() throws Exception {
- runTestLimit(0, 2, 0, 1,
- () -> runTestDeleteEmptyContainer(2));
+ public void testOverReplicated() throws ContainerNotFoundException {
+ ContainerInfo container = createContainerInfo(repConfig, 1,
+ HddsProtos.LifeCycleState.CLOSED);
+ addReplicas(container, 1, 2, 3, 4, 5, 5);
+ ContainerHealthResult result = replicationManager.processContainer(
+ container, underRep, overRep, repReport);
+ Assert.assertEquals(ContainerHealthResult.HealthState.OVER_REPLICATED,
+ result.getHealthState());
+ Assert.assertEquals(0, underRep.size());
+ Assert.assertEquals(1, overRep.size());
+ Assert.assertEquals(1, repReport.getStat(
+ ReplicationManagerReport.HealthState.OVER_REPLICATED));
}
- /**
- * A closed empty container with a non-empty replica should not be deleted.
- */
@Test
- public void testDeleteEmptyContainerNonEmptyReplica() throws Exception {
- final ContainerInfo container = createContainer(LifeCycleState.CLOSED, 0,
- 0);
- addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
- addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
- // Create the 3rd replica with non-zero key count and used bytes
- addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED, 100, 1);
- assertDeleteScheduled(0);
- }
-
- private ContainerInfo createContainer(LifeCycleState containerState)
- throws IOException {
- return createContainer(containerState, CONTAINER_USED_BYTES_DEFAULT,
- CONTAINER_NUM_KEYS_DEFAULT);
- }
-
- private ContainerInfo createContainer(LifeCycleState containerState,
- long usedBytes, long numKeys) throws IOException {
- final ContainerInfo container = getContainer(containerState);
- container.setUsedBytes(usedBytes);
- container.setNumberOfKeys(numKeys);
- containerStateManager.addContainer(container.getProtobuf());
- return container;
- }
-
- private DatanodeDetails addNode(NodeStatus nodeStatus) {
- DatanodeDetails dn = randomDatanodeDetails();
- dn.setPersistedOpState(nodeStatus.getOperationalState());
- dn.setPersistedOpStateExpiryEpochSec(
- nodeStatus.getOpStateExpiryEpochSeconds());
- nodeManager.register(dn, nodeStatus);
- return dn;
- }
-
- private void resetReplicationManager() throws InterruptedException {
- replicationManager.stop();
- Thread.sleep(100L);
- replicationManager.start();
- Thread.sleep(100L);
- }
-
- private ContainerReplica addReplica(ContainerInfo container,
- NodeStatus nodeStatus, State replicaState)
- throws ContainerNotFoundException {
- DatanodeDetails dn = addNode(nodeStatus);
- return addReplicaToDn(container, dn, replicaState);
- }
-
- private ContainerReplica addReplica(ContainerInfo container,
- NodeStatus nodeStatus, State replicaState, long usedBytes, long numOfKeys)
- throws ContainerNotFoundException {
- DatanodeDetails dn = addNode(nodeStatus);
- return addReplicaToDn(container, dn, replicaState, usedBytes, numOfKeys);
- }
-
- private ContainerReplica addReplicaToDn(ContainerInfo container,
- DatanodeDetails dn, State replicaState)
- throws ContainerNotFoundException {
- // Using the same originID for all replica in the container set. If each
- // replica has a unique originID, it causes problems in ReplicationManager
- // when processing over-replicated containers.
- final UUID originNodeId =
- UUID.nameUUIDFromBytes(Longs.toByteArray(container.getContainerID()));
- final ContainerReplica replica = getReplicas(container.containerID(),
- replicaState, container.getUsedBytes(), container.getNumberOfKeys(),
- 1000L, originNodeId, dn);
- containerStateManager
- .updateContainerReplica(container.containerID(), replica);
- return replica;
- }
-
- private ContainerReplica addReplicaToDn(ContainerInfo container,
- DatanodeDetails dn, State replicaState, long usedBytes, long numOfKeys)
+ public void testOverReplicatedFixByPending()
throws ContainerNotFoundException {
- // Using the same originID for all replica in the container set. If each
- // replica has a unique originID, it causes problems in ReplicationManager
- // when processing over-replicated containers.
- final UUID originNodeId =
- UUID.nameUUIDFromBytes(Longs.toByteArray(container.getContainerID()));
- final ContainerReplica replica = getReplicas(container.containerID(),
- replicaState, usedBytes, numOfKeys, 1000L, originNodeId, dn);
- containerStateManager
- .updateContainerReplica(container.containerID(), replica);
- return replica;
- }
-
- private void assertReplicaScheduled(int delta) {
- final int currentReplicateCommandCount = datanodeCommandHandler
- .getInvocationCount(SCMCommandProto.Type.replicateContainerCommand);
-
- replicationManager.processAll();
- eventQueue.processAll(1000);
- Assertions.assertEquals(currentReplicateCommandCount + delta,
- datanodeCommandHandler.getInvocationCount(
- SCMCommandProto.Type.replicateContainerCommand));
- Assertions.assertEquals(currentReplicateCommandCount + delta,
- replicationManager.getMetrics().getNumReplicationCmdsSent());
- }
-
- private void assertDeleteScheduled(int delta) {
- final int currentDeleteCommandCount = datanodeCommandHandler
- .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
-
- replicationManager.processAll();
- eventQueue.processAll(1000);
- Assertions.assertEquals(currentDeleteCommandCount + delta,
- datanodeCommandHandler.getInvocationCount(
- SCMCommandProto.Type.deleteContainerCommand));
- Assertions.assertEquals(currentDeleteCommandCount + delta,
- replicationManager.getMetrics().getNumDeletionCmdsSent());
- }
-
- private void assertUnderReplicatedCount(int count) {
- ReplicationManagerReport report = replicationManager.getContainerReport();
- Assertions.assertEquals(count, report.getStat(
- ReplicationManagerReport.HealthState.UNDER_REPLICATED));
- }
-
- private void assertMissingCount(int count) {
- ReplicationManagerReport report = replicationManager.getContainerReport();
- Assertions.assertEquals(count, report.getStat(
- ReplicationManagerReport.HealthState.MISSING));
- }
-
- private void assertOverReplicatedCount(int count) {
- ReplicationManagerReport report = replicationManager.getContainerReport();
- Assertions.assertEquals(count, report.getStat(
+ ContainerInfo container = createContainerInfo(repConfig, 1,
+ HddsProtos.LifeCycleState.CLOSED);
+ addReplicas(container, 1, 2, 3, 4, 5, 5);
+ containerReplicaPendingOps.scheduleDeleteReplica(container.containerID(),
+ MockDatanodeDetails.randomDatanodeDetails(), 5);
+ ContainerHealthResult result = replicationManager.processContainer(
+ container, underRep, overRep, repReport);
+ Assert.assertEquals(ContainerHealthResult.HealthState.OVER_REPLICATED,
+ result.getHealthState());
+ Assert.assertEquals(0, underRep.size());
+ // If the pending replication fixes the over-replication, nothing is added
+ // to the over replication list.
+ Assert.assertEquals(0, overRep.size());
+ Assert.assertEquals(1, repReport.getStat(
ReplicationManagerReport.HealthState.OVER_REPLICATED));
}
- @AfterEach
- public void teardown() throws Exception {
- containerStateManager.close();
- replicationManager.stop();
- if (dbStore != null) {
- dbStore.close();
- }
- FileUtils.deleteDirectory(testDir);
+ private Set<ContainerReplica> addReplicas(ContainerInfo container,
+ int... indexes) {
+ final Set<ContainerReplica> replicas =
+ createReplicas(container.containerID(), indexes);
+ containerReplicaMap.put(container.containerID(), replicas);
+ return replicas;
}
- private static class DatanodeCommandHandler implements
- EventHandler<CommandForDatanode> {
-
- private AtomicInteger invocation = new AtomicInteger(0);
- private Map<SCMCommandProto.Type, AtomicInteger> commandInvocation =
- new HashMap<>();
- private List<CommandForDatanode> commands = new ArrayList<>();
-
- @Override
- public void onMessage(final CommandForDatanode command,
- final EventPublisher publisher) {
- final SCMCommandProto.Type type = command.getCommand().getType();
- commandInvocation.computeIfAbsent(type, k -> new AtomicInteger(0));
- commandInvocation.get(type).incrementAndGet();
- invocation.incrementAndGet();
- commands.add(command);
- }
-
- private int getInvocation() {
- return invocation.get();
- }
-
- private int getInvocationCount(SCMCommandProto.Type type) {
- return commandInvocation.containsKey(type) ?
- commandInvocation.get(type).get() : 0;
- }
-
- private List<CommandForDatanode> getReceivedCommands() {
- return commands;
- }
-
- /**
- * Returns true if the command handler has received the given
- * command type for the provided datanode.
- *
- * @param type Command Type
- * @param datanode DatanodeDetails
- * @return True if command was received, false otherwise
- */
- private boolean received(final SCMCommandProto.Type type,
- final DatanodeDetails datanode) {
- return commands.stream().anyMatch(dc ->
- dc.getCommand().getType().equals(type) &&
- dc.getDatanodeId().equals(datanode.getUuid()));
- }
- }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org