You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by so...@apache.org on 2022/10/19 08:32:33 UTC
[ozone] branch master updated: HDDS-7058. EC: ReplicationManager - Implement ratis container replication check handler (#3802)
This is an automated email from the ASF dual-hosted git repository.
sodonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 237a9a1594 HDDS-7058. EC: ReplicationManager - Implement ratis container replication check handler (#3802)
237a9a1594 is described below
commit 237a9a15949299c700c7352019662429b0acf242
Author: Stephen O'Donnell <st...@gmail.com>
AuthorDate: Wed Oct 19 09:32:27 2022 +0100
HDDS-7058. EC: ReplicationManager - Implement ratis container replication check handler (#3802)
---
.../replication/ContainerHealthResult.java | 69 +++
.../replication/LegacyReplicationManager.java | 1 -
.../RatisContainerReplicaCount.java | 87 +++-
.../container/replication/ReplicationManager.java | 19 +-
.../health/RatisReplicationCheckHandler.java | 201 ++++++++
.../TestRatisContainerReplicaCount.java | 58 ++-
.../health/TestRatisReplicationCheckHandler.java | 560 +++++++++++++++++++++
.../hdds/scm/node/TestDatanodeAdminMonitor.java | 2 +-
8 files changed, 987 insertions(+), 10 deletions(-)
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerHealthResult.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerHealthResult.java
index de290294e9..b438d6de4c 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerHealthResult.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerHealthResult.java
@@ -106,6 +106,9 @@ public class ContainerHealthResult {
private final int remainingRedundancy;
private final boolean dueToDecommission;
private final boolean sufficientlyReplicatedAfterPending;
+ private boolean dueToMisReplication = false;
+ private boolean isMisReplicated = false;
+ private boolean isMisReplicatedAfterPending = false;
private final boolean unrecoverable;
private int requeueCount = 0;
@@ -119,6 +122,44 @@ public class ContainerHealthResult {
this.unrecoverable = unrecoverable;
}
+ /**
+ * Pass true to indicate the container is mis-replicated - ie it does not
+ * meet the placement policy.
+ * @param isMisRep True if the container is mis-replicated, false if not.
+ * @return this object to allow calls to be chained
+ */
+ public UnderReplicatedHealthResult
+ setMisReplicated(boolean isMisRep) {
+ this.isMisReplicated = isMisRep;
+ return this;
+ }
+
+ /**
+ * Pass true to indicate the container is mis-replicated after considering
+ * pending replicas scheduled for create or delete.
+ * @param isMisRep True if the container is mis-replicated considering
+ * pending replicas, or false if not.
+ * @return this object to allow calls to be chained
+ */
+ public UnderReplicatedHealthResult
+ setMisReplicatedAfterPending(boolean isMisRep) {
+ this.isMisReplicatedAfterPending = isMisRep;
+ return this;
+ }
+
+ /**
+ * If the container is ONLY under replicated due to mis-replication, pass
+ * true, otherwise pass false.
+ * @param dueToMisRep Pass true if the container has enough replicas but
+ * does not meet the placement policy.
+ * @return
+ */
+ public UnderReplicatedHealthResult
+ setDueToMisReplication(boolean dueToMisRep) {
+ this.dueToMisReplication = dueToMisRep;
+ return this;
+ }
+
/**
* How many more replicas can be lost before the container is
* unreadable. For containers which are under-replicated due to decommission
@@ -187,6 +228,34 @@ public class ContainerHealthResult {
return sufficientlyReplicatedAfterPending;
}
+ /**
+ * Returns true if the container is mis-replicated, ignoring any pending
+ * replicas scheduled to be created.
+ * @return True if mis-replicated, ignoring pending
+ */
+ public boolean isMisReplicated() {
+ return isMisReplicated;
+ }
+
+ /**
+ * Returns true if the container is mis-replicated after taking account of
+ * pending replicas, which are schedule to be created.
+ * @return true is mis-replicated after pending.
+ */
+ public boolean isMisReplicatedAfterPending() {
+ return isMisReplicatedAfterPending;
+ }
+
+ /**
+ * Returns true if the under replication is only due to mis-replication.
+ * In other words, the container has enough replicas, but they do not meet
+ * the placement policy.
+ * @return true if the under-replication is only due to mis-replication
+ */
+ public boolean isDueToMisReplication() {
+ return dueToMisReplication;
+ }
+
/**
* Indicates whether a container has enough replicas to be read. For Ratis
* at least one replia must be available. For EC, at least dataNum replicas
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java
index 27c7a12e66..b58140ac31 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java
@@ -34,7 +34,6 @@ import org.apache.hadoop.hdds.scm.ContainerPlacementStatus;
import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.ContainerID;
-import org.apache.hadoop.hdds.scm.container.RatisContainerReplicaCount;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/RatisContainerReplicaCount.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisContainerReplicaCount.java
similarity index 76%
rename from hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/RatisContainerReplicaCount.java
rename to hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisContainerReplicaCount.java
index f25423d4ec..577fc6004d 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/RatisContainerReplicaCount.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisContainerReplicaCount.java
@@ -15,10 +15,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.hdds.scm.container;
+package org.apache.hadoop.hdds.scm.container.replication;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaCount;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import java.util.Set;
@@ -243,11 +244,27 @@ public class RatisContainerReplicaCount implements ContainerReplicaCount {
*/
@Override
public boolean isSufficientlyReplicated() {
- return missingReplicas() + inFlightDel <= 0;
+ return isSufficientlyReplicated(false);
}
/**
- * Return true is the container is over replicated. Decommission and
+ * Return true if the container is sufficiently replicated. Decommissioning
+ * and Decommissioned containers are ignored in this check, assuming they will
+ * eventually be removed from the cluster.
+ * This check ignores inflight additions, if includePendingAdd is false,
+ * otherwise it will assume they complete ok.
+ *
+ * @return True if the container is sufficiently replicated and False
+ * otherwise.
+ */
+ public boolean isSufficientlyReplicated(boolean includePendingAdd) {
+ // Positive for under-rep, negative for over-rep
+ int delta = redundancyDelta(true, includePendingAdd);
+ return delta <= 0;
+ }
+
+ /**
+ * Return true if the container is over replicated. Decommission and
* maintenance containers are ignored for this check.
* The check ignores inflight additions, as they may fail, but it does
* consider inflight deletes, as they would reduce the over replication when
@@ -257,7 +274,67 @@ public class RatisContainerReplicaCount implements ContainerReplicaCount {
*/
@Override
public boolean isOverReplicated() {
- return missingReplicas() + inFlightDel < 0;
+ return isOverReplicated(true);
+ }
+
+ /**
+ * Return true if the container is over replicated. Decommission and
+ * maintenance containers are ignored for this check.
+ * The check ignores inflight additions, as they may fail, but it does
+ * consider inflight deletes if includePendingDelete is true.
+ *
+ * @return True if the container is over replicated, false otherwise.
+ */
+ public boolean isOverReplicated(boolean includePendingDelete) {
+ return getExcessRedundancy(includePendingDelete) > 0;
+ }
+
+ /**
+ * @return Return Excess Redundancy replica nums.
+ */
+ public int getExcessRedundancy(boolean includePendingDelete) {
+ int excessRedundancy = redundancyDelta(includePendingDelete, false);
+ if (excessRedundancy >= 0) {
+ // either perfectly replicated or under replicated
+ return 0;
+ }
+ return -excessRedundancy;
+ }
+
+ /**
+ * Return the delta from the expected number of replicas, optionally
+ * considering inflight add and deletes.
+ * @param includePendingDelete
+ * @param includePendingAdd
+ * @return zero if perfectly replicated, a negative value for over replication
+ * and a positive value for under replication. The magnitude of the
+ * return value indicates how many replias the container is over or
+ * under replicated by.
+ */
+ private int redundancyDelta(boolean includePendingDelete,
+ boolean includePendingAdd) {
+ int excessRedundancy = missingReplicas();
+ if (includePendingDelete) {
+ excessRedundancy += inFlightDel;
+ }
+ if (includePendingAdd) {
+ excessRedundancy -= inFlightAdd;
+ }
+ return excessRedundancy;
+ }
+
+ /**
+ * How many more replicas can be lost before the container is
+ * unreadable, assuming any infligh deletes will complete. For containers
+ * which are under-replicated due to decommission
+ * or maintenance only, the remaining redundancy will include those
+ * decommissioning or maintenance replicas, as they are technically still
+ * available until the datanode processes are stopped.
+ * @return Count of remaining redundant replicas.
+ */
+ public int getRemainingRedundancy() {
+ return Math.max(0,
+ healthyCount + decommissionCount + maintenanceCount - inFlightDel - 1);
}
/**
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
index 05300d6a08..77142848b6 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.hdds.scm.container.replication.health.ECReplicationChec
import org.apache.hadoop.hdds.scm.container.replication.health.HealthCheck;
import org.apache.hadoop.hdds.scm.container.replication.health.OpenContainerHandler;
import org.apache.hadoop.hdds.scm.container.replication.health.QuasiClosedContainerHandler;
+import org.apache.hadoop.hdds.scm.container.replication.health.RatisReplicationCheckHandler;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.ha.SCMService;
@@ -74,6 +75,7 @@ import java.util.concurrent.locks.ReentrantLock;
import static org.apache.hadoop.hdds.conf.ConfigTag.OZONE;
import static org.apache.hadoop.hdds.conf.ConfigTag.SCM;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.EC;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.RATIS;
/**
* Replication Manager (RM) is the one which is responsible for making sure
@@ -144,12 +146,14 @@ public class ReplicationManager implements SCMService {
private final Clock clock;
private final ContainerReplicaPendingOps containerReplicaPendingOps;
private final ECReplicationCheckHandler ecReplicationCheckHandler;
+ private final RatisReplicationCheckHandler ratisReplicationCheckHandler;
private final EventPublisher eventPublisher;
private final ReentrantLock lock = new ReentrantLock();
private ReplicationQueue replicationQueue;
private final ECUnderReplicationHandler ecUnderReplicationHandler;
private final ECOverReplicationHandler ecOverReplicationHandler;
private final int maintenanceRedundancy;
+ private final int ratisMaintenanceMinReplicas;
private Thread underReplicatedProcessorThread;
private Thread overReplicatedProcessorThread;
private final UnderReplicatedProcessor underReplicatedProcessor;
@@ -190,9 +194,13 @@ public class ReplicationManager implements SCMService {
this.containerReplicaPendingOps = replicaPendingOps;
this.legacyReplicationManager = legacyReplicationManager;
this.ecReplicationCheckHandler = new ECReplicationCheckHandler();
+ this.ratisReplicationCheckHandler =
+ new RatisReplicationCheckHandler(containerPlacement);
this.nodeManager = nodeManager;
this.replicationQueue = new ReplicationQueue();
this.maintenanceRedundancy = rmConf.maintenanceRemainingRedundancy;
+ this.ratisMaintenanceMinReplicas = rmConf.getMaintenanceReplicaMinimum();
+
ecUnderReplicationHandler = new ECUnderReplicationHandler(
ecReplicationCheckHandler, containerPlacement, conf, nodeManager);
ecOverReplicationHandler =
@@ -212,7 +220,8 @@ public class ReplicationManager implements SCMService {
.addNext(new ClosingContainerHandler(this))
.addNext(new QuasiClosedContainerHandler(this))
.addNext(new ClosedWithMismatchedReplicasHandler(this))
- .addNext(ecReplicationCheckHandler);
+ .addNext(ecReplicationCheckHandler)
+ .addNext(ratisReplicationCheckHandler);
start();
}
@@ -466,10 +475,16 @@ public class ReplicationManager implements SCMService {
List<ContainerReplicaOp> pendingOps =
containerReplicaPendingOps.getPendingOps(containerID);
+ // There is a different config for EC and Ratis maintenance
+ // minimum replicas, so we must pass through the correct one.
+ int maintRedundancy = maintenanceRedundancy;
+ if (containerInfo.getReplicationType() == RATIS) {
+ maintRedundancy = ratisMaintenanceMinReplicas;
+ }
ContainerCheckRequest checkRequest = new ContainerCheckRequest.Builder()
.setContainerInfo(containerInfo)
.setContainerReplicas(replicas)
- .setMaintenanceRedundancy(maintenanceRedundancy)
+ .setMaintenanceRedundancy(maintRedundancy)
.setReport(report)
.setPendingOps(pendingOps)
.setReplicationQueue(repQueue)
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/RatisReplicationCheckHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/RatisReplicationCheckHandler.java
new file mode 100644
index 0000000000..cb53c59e08
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/RatisReplicationCheckHandler.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.container.replication.health;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.ContainerPlacementStatus;
+import org.apache.hadoop.hdds.scm.PlacementPolicy;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.container.ReplicationManagerReport;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerCheckRequest;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaOp;
+import org.apache.hadoop.hdds.scm.container.replication.RatisContainerReplicaCount;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.RATIS;
+
+/**
+ * Class to determine the health state of a Ratis Container. Given the container
+ * and current replica details, along with replicas pending add and delete,
+ * this class will return a ContainerHealthResult indicating if the container
+ * is healthy, or under / over replicated etc.
+ */
+public class RatisReplicationCheckHandler extends AbstractCheck {
+ public static final Logger LOG =
+ LoggerFactory.getLogger(RatisReplicationCheckHandler.class);
+
+ /**
+ * PlacementPolicy which is used to identify where a container
+ * should be replicated.
+ */
+ private final PlacementPolicy ratisContainerPlacement;
+
+ public RatisReplicationCheckHandler(PlacementPolicy containerPlacement) {
+ this.ratisContainerPlacement = containerPlacement;
+ }
+
+ @Override
+ public boolean handle(ContainerCheckRequest request) {
+ if (request.getContainerInfo().getReplicationType() != RATIS) {
+ // This handler is only for Ratis containers.
+ return false;
+ }
+ ReplicationManagerReport report = request.getReport();
+ ContainerInfo container = request.getContainerInfo();
+ ContainerHealthResult health = checkHealth(request);
+ if (health.getHealthState() == ContainerHealthResult.HealthState.HEALTHY) {
+ // If the container is healthy, there is nothing else to do in this
+ // handler so return as unhandled so any further handlers will be tried.
+ return false;
+ }
+ if (health.getHealthState()
+ == ContainerHealthResult.HealthState.UNDER_REPLICATED) {
+ report.incrementAndSample(
+ ReplicationManagerReport.HealthState.UNDER_REPLICATED,
+ container.containerID());
+ ContainerHealthResult.UnderReplicatedHealthResult underHealth
+ = ((ContainerHealthResult.UnderReplicatedHealthResult) health);
+ if (underHealth.isUnrecoverable()) {
+ report.incrementAndSample(ReplicationManagerReport.HealthState.MISSING,
+ container.containerID());
+ }
+ if (underHealth.isMisReplicated()) {
+ report.incrementAndSample(
+ ReplicationManagerReport.HealthState.MIS_REPLICATED,
+ container.containerID());
+ }
+ // TODO - if it is unrecoverable, should we return false to other
+ // handlers can be tried?
+ if (!underHealth.isUnrecoverable() &&
+ (underHealth.isMisReplicatedAfterPending() ||
+ !underHealth.isSufficientlyReplicatedAfterPending())) {
+ request.getReplicationQueue().enqueue(underHealth);
+ }
+ return true;
+ }
+
+ if (health.getHealthState()
+ == ContainerHealthResult.HealthState.OVER_REPLICATED) {
+ report.incrementAndSample(
+ ReplicationManagerReport.HealthState.OVER_REPLICATED,
+ container.containerID());
+ ContainerHealthResult.OverReplicatedHealthResult overHealth
+ = ((ContainerHealthResult.OverReplicatedHealthResult) health);
+ if (!overHealth.isSufficientlyReplicatedAfterPending()) {
+ request.getReplicationQueue().enqueue(overHealth);
+ }
+ return true;
+ }
+ return false;
+ }
+
+ public ContainerHealthResult checkHealth(ContainerCheckRequest request) {
+ ContainerInfo container = request.getContainerInfo();
+ Set<ContainerReplica> replicas = request.getContainerReplicas();
+ List<ContainerReplicaOp> replicaPendingOps = request.getPendingOps();
+ // Note that this setting is minReplicasForMaintenance. For EC the variable
+ // is defined as remainingRedundancy which is subtly different.
+ int minReplicasForMaintenance = request.getMaintenanceRedundancy();
+ int pendingAdd = 0;
+ int pendingDelete = 0;
+ for (ContainerReplicaOp op : replicaPendingOps) {
+ if (op.getOpType() == ContainerReplicaOp.PendingOpType.ADD) {
+ pendingAdd++;
+ } else if (op.getOpType() == ContainerReplicaOp.PendingOpType.DELETE) {
+ pendingDelete++;
+ }
+ }
+ int requiredNodes = container.getReplicationConfig().getRequiredNodes();
+
+ // RatisContainerReplicaCount uses the minReplicasForMaintenance rather
+ // than remainingRedundancy which ECContainerReplicaCount uses.
+ RatisContainerReplicaCount replicaCount =
+ new RatisContainerReplicaCount(container, replicas, pendingAdd,
+ pendingDelete, requiredNodes, minReplicasForMaintenance);
+
+ ContainerPlacementStatus placementStatus =
+ getPlacementStatus(replicas, requiredNodes, Collections.emptyList());
+
+ ContainerPlacementStatus placementStatusWithPending = placementStatus;
+ if (replicaPendingOps.size() > 0) {
+ placementStatusWithPending =
+ getPlacementStatus(replicas, requiredNodes, replicaPendingOps);
+ }
+ boolean sufficientlyReplicated
+ = replicaCount.isSufficientlyReplicated(false);
+ boolean isPolicySatisfied = placementStatus.isPolicySatisfied();
+ if (!sufficientlyReplicated || !isPolicySatisfied) {
+ ContainerHealthResult.UnderReplicatedHealthResult result =
+ new ContainerHealthResult.UnderReplicatedHealthResult(
+ container, replicaCount.getRemainingRedundancy(),
+ isPolicySatisfied
+ && replicas.size() - pendingDelete >= requiredNodes,
+ replicaCount.isSufficientlyReplicated(true),
+ replicaCount.isUnrecoverable());
+ result.setMisReplicated(!isPolicySatisfied)
+ .setMisReplicatedAfterPending(
+ !placementStatusWithPending.isPolicySatisfied())
+ .setDueToMisReplication(
+ !isPolicySatisfied && replicaCount.isSufficientlyReplicated());
+ return result;
+ }
+
+ boolean isOverReplicated = replicaCount.isOverReplicated(false);
+ if (isOverReplicated) {
+ boolean repOkWithPending = !replicaCount.isOverReplicated(true);
+ return new ContainerHealthResult.OverReplicatedHealthResult(
+ container, replicaCount.getExcessRedundancy(false), repOkWithPending);
+ }
+ // No issues detected, just return healthy.
+ return new ContainerHealthResult.HealthyResult(container);
+ }
+
+ /**
+ * Given a set of ContainerReplica, transform it to a list of DatanodeDetails
+ * and then check if the list meets the container placement policy.
+ * @param replicas List of containerReplica
+ * @param replicationFactor Expected Replication Factor of the containe
+ * @return ContainerPlacementStatus indicating if the policy is met or not
+ */
+ private ContainerPlacementStatus getPlacementStatus(
+ Set<ContainerReplica> replicas, int replicationFactor,
+ List<ContainerReplicaOp> pendingOps) {
+
+ Set<DatanodeDetails> replicaDns = replicas.stream()
+ .map(ContainerReplica::getDatanodeDetails)
+ .collect(Collectors.toSet());
+ for (ContainerReplicaOp op : pendingOps) {
+ if (op.getOpType() == ContainerReplicaOp.PendingOpType.ADD) {
+ replicaDns.add(op.getTarget());
+ }
+ if (op.getOpType() == ContainerReplicaOp.PendingOpType.DELETE) {
+ replicaDns.remove(op.getTarget());
+ }
+ }
+ return ratisContainerPlacement.validateContainerPlacement(
+ new ArrayList<>(replicaDns), replicationFactor);
+ }
+}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisContainerReplicaCount.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisContainerReplicaCount.java
index 5ceaec39bf..7d4947dd64 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisContainerReplicaCount.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisContainerReplicaCount.java
@@ -23,7 +23,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
-import org.apache.hadoop.hdds.scm.container.RatisContainerReplicaCount;
+import org.junit.Assert;
import org.junit.jupiter.api.Test;
import java.util.HashSet;
@@ -433,6 +433,62 @@ class TestRatisContainerReplicaCount {
assertFalse(rcnt.isSufficientlyReplicated());
}
+ @Test
+ void testOverReplicatedWithAndWithoutPending() {
+ Set<ContainerReplica> replica = registerNodes(IN_SERVICE, IN_SERVICE,
+ IN_SERVICE, IN_SERVICE, IN_SERVICE);
+ ContainerInfo container = createContainer(HddsProtos.LifeCycleState.CLOSED);
+ RatisContainerReplicaCount rcnt =
+ new RatisContainerReplicaCount(container, replica, 0, 2, 3, 2);
+ assertTrue(rcnt.isOverReplicated(false));
+ assertFalse(rcnt.isOverReplicated(true));
+ assertEquals(2, rcnt.getExcessRedundancy(false));
+ assertEquals(0, rcnt.getExcessRedundancy(true));
+ }
+
+ @Test
+ void testRemainingRedundancy() {
+ Set<ContainerReplica> replica = registerNodes(IN_SERVICE, IN_SERVICE,
+ IN_SERVICE, IN_SERVICE);
+ ContainerInfo container = createContainer(HddsProtos.LifeCycleState.CLOSED);
+ RatisContainerReplicaCount rcnt =
+ new RatisContainerReplicaCount(container, replica, 0, 1, 3, 2);
+ Assert.assertEquals(2, rcnt.getRemainingRedundancy());
+ replica = registerNodes(IN_SERVICE);
+ rcnt =
+ new RatisContainerReplicaCount(container, replica, 0, 0, 3, 2);
+ Assert.assertEquals(0, rcnt.getRemainingRedundancy());
+ rcnt =
+ new RatisContainerReplicaCount(container, replica, 0, 1, 3, 2);
+ Assert.assertEquals(0, rcnt.getRemainingRedundancy());
+ }
+
+ @Test
+ void testSufficientlyReplicatedWithAndWithoutPending() {
+ Set<ContainerReplica> replica = registerNodes(IN_SERVICE, IN_SERVICE);
+ ContainerInfo container = createContainer(HddsProtos.LifeCycleState.CLOSED);
+ RatisContainerReplicaCount rcnt =
+ new RatisContainerReplicaCount(container, replica, 0, 0, 3, 2);
+ assertFalse(rcnt.isSufficientlyReplicated(true));
+ assertFalse(rcnt.isSufficientlyReplicated(false));
+
+ rcnt =
+ new RatisContainerReplicaCount(container, replica, 1, 0, 3, 2);
+ assertTrue(rcnt.isSufficientlyReplicated(true));
+ assertFalse(rcnt.isSufficientlyReplicated(false));
+
+ replica = registerNodes(IN_SERVICE, IN_SERVICE, IN_SERVICE);
+ rcnt =
+ new RatisContainerReplicaCount(container, replica, 0, 1, 3, 2);
+ assertFalse(rcnt.isSufficientlyReplicated(false));
+ assertFalse(rcnt.isSufficientlyReplicated(true));
+ rcnt =
+ new RatisContainerReplicaCount(container, replica, 1, 1, 3, 2);
+ assertFalse(rcnt.isSufficientlyReplicated(false));
+ assertTrue(rcnt.isSufficientlyReplicated(true));
+
+ }
+
private void validate(RatisContainerReplicaCount rcnt,
boolean sufficientlyReplicated, int replicaDelta,
boolean overReplicated) {
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/health/TestRatisReplicationCheckHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/health/TestRatisReplicationCheckHandler.java
new file mode 100644
index 0000000000..d8dd69284b
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/health/TestRatisReplicationCheckHandler.java
@@ -0,0 +1,560 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.container.replication.health;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.RatisReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.hdds.scm.PlacementPolicy;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.container.ReplicationManagerReport;
+import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementStatusDefault;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerCheckRequest;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult.HealthState;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult.OverReplicatedHealthResult;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult.UnderReplicatedHealthResult;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaOp;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationQueue;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONED;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONING;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_MAINTENANCE;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.THREE;
+import static org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaOp.PendingOpType.ADD;
+import static org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaOp.PendingOpType.DELETE;
+import static org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil.createContainerInfo;
+import static org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil.createReplicas;
+
+/**
+ * Tests for the RatisReplicationCheckHandler class.
+ */
+public class TestRatisReplicationCheckHandler {
+
+ private RatisReplicationCheckHandler healthCheck;
+ private ReplicationConfig repConfig;
+ private PlacementPolicy containerPlacementPolicy;
+ private ReplicationQueue repQueue;
+ private ContainerCheckRequest.Builder requestBuilder;
+ private ReplicationManagerReport report;
+ private int maintenanceRedundancy = 2;
+
+ @Before
+ public void setup() throws IOException {
+ containerPlacementPolicy = Mockito.mock(PlacementPolicy.class);
+ Mockito.when(containerPlacementPolicy.validateContainerPlacement(
+ Mockito.any(),
+ Mockito.anyInt()
+ )).thenAnswer(invocation ->
+ new ContainerPlacementStatusDefault(2, 2, 3));
+ healthCheck = new RatisReplicationCheckHandler(containerPlacementPolicy);
+ repConfig = RatisReplicationConfig.getInstance(THREE);
+ repQueue = new ReplicationQueue();
+ report = new ReplicationManagerReport();
+ requestBuilder = new ContainerCheckRequest.Builder()
+ .setReplicationQueue(repQueue)
+ .setMaintenanceRedundancy(maintenanceRedundancy)
+ .setPendingOps(Collections.emptyList())
+ .setReport(report);
+ }
+
+ @Test
+ public void testReturnFalseForNonRatis() {
+ ContainerInfo container =
+ createContainerInfo(new ECReplicationConfig(3, 2));
+ Set<ContainerReplica> replicas =
+ createReplicas(container.containerID(), 1, 2, 3, 4);
+
+ requestBuilder.setContainerReplicas(replicas)
+ .setContainerInfo(container);
+ Assert.assertFalse(healthCheck.handle(requestBuilder.build()));
+ Assert.assertEquals(0, repQueue.underReplicatedQueueSize());
+ Assert.assertEquals(0, repQueue.overReplicatedQueueSize());
+ }
+
+ @Test
+ public void testHealthyContainerIsHealthy() {
+ ContainerInfo container = createContainerInfo(repConfig);
+ Set<ContainerReplica> replicas
+ = createReplicas(container.containerID(), 0, 0, 0);
+ requestBuilder.setContainerReplicas(replicas)
+ .setContainerInfo(container);
+ ContainerHealthResult result =
+ healthCheck.checkHealth(requestBuilder.build());
+ Assert.assertEquals(HealthState.HEALTHY, result.getHealthState());
+
+ Assert.assertFalse(healthCheck.handle(requestBuilder.build()));
+ Assert.assertEquals(0, repQueue.underReplicatedQueueSize());
+ Assert.assertEquals(0, repQueue.overReplicatedQueueSize());
+ }
+
+ @Test
+ public void testUnderReplicatedContainerIsUnderReplicated() {
+ ContainerInfo container = createContainerInfo(repConfig);
+ Set<ContainerReplica> replicas
+ = createReplicas(container.containerID(), 0, 0);
+ requestBuilder.setContainerReplicas(replicas)
+ .setContainerInfo(container);
+ UnderReplicatedHealthResult result = (UnderReplicatedHealthResult)
+ healthCheck.checkHealth(requestBuilder.build());
+ Assert.assertEquals(HealthState.UNDER_REPLICATED, result.getHealthState());
+ Assert.assertEquals(1, result.getRemainingRedundancy());
+ Assert.assertFalse(result.isSufficientlyReplicatedAfterPending());
+ Assert.assertFalse(result.underReplicatedDueToDecommission());
+
+ Assert.assertTrue(healthCheck.handle(requestBuilder.build()));
+ Assert.assertEquals(1, repQueue.underReplicatedQueueSize());
+ Assert.assertEquals(0, repQueue.overReplicatedQueueSize());
+ Assert.assertEquals(1, report.getStat(
+ ReplicationManagerReport.HealthState.UNDER_REPLICATED));
+ }
+
+ @Test
+ public void testUnderReplicatedContainerDueToPendingDelete() {
+ ContainerInfo container = createContainerInfo(repConfig);
+ Set<ContainerReplica> replicas
+ = createReplicas(container.containerID(), 0, 0, 0);
+ List<ContainerReplicaOp> pending = new ArrayList<>();
+ pending.add(ContainerReplicaOp.create(
+ DELETE, MockDatanodeDetails.randomDatanodeDetails(), 0));
+ requestBuilder.setContainerReplicas(replicas)
+ .setContainerInfo(container)
+ .setPendingOps(pending);
+ UnderReplicatedHealthResult result = (UnderReplicatedHealthResult)
+ healthCheck.checkHealth(requestBuilder.build());
+ Assert.assertEquals(HealthState.UNDER_REPLICATED, result.getHealthState());
+ Assert.assertEquals(1, result.getRemainingRedundancy());
+ Assert.assertFalse(result.isSufficientlyReplicatedAfterPending());
+ Assert.assertFalse(result.underReplicatedDueToDecommission());
+
+ Assert.assertTrue(healthCheck.handle(requestBuilder.build()));
+ Assert.assertEquals(1, repQueue.underReplicatedQueueSize());
+ Assert.assertEquals(0, repQueue.overReplicatedQueueSize());
+ Assert.assertEquals(1, report.getStat(
+ ReplicationManagerReport.HealthState.UNDER_REPLICATED));
+ }
+
+ @Test
+ public void testUnderReplicatedContainerFixedWithPending() {
+ ContainerInfo container = createContainerInfo(repConfig);
+ Set<ContainerReplica> replicas
+ = createReplicas(container.containerID(), 0, 0);
+ List<ContainerReplicaOp> pending = new ArrayList<>();
+ pending.add(ContainerReplicaOp.create(
+ ADD, MockDatanodeDetails.randomDatanodeDetails(), 0));
+ requestBuilder.setContainerReplicas(replicas)
+ .setPendingOps(pending)
+ .setContainerInfo(container);
+ UnderReplicatedHealthResult result = (UnderReplicatedHealthResult)
+ healthCheck.checkHealth(requestBuilder.build());
+ Assert.assertEquals(HealthState.UNDER_REPLICATED, result.getHealthState());
+ Assert.assertEquals(1, result.getRemainingRedundancy());
+ Assert.assertTrue(result.isSufficientlyReplicatedAfterPending());
+ Assert.assertFalse(result.underReplicatedDueToDecommission());
+
+ Assert.assertTrue(healthCheck.handle(requestBuilder.build()));
+ // Fixed with pending, so nothing added to the queue
+ Assert.assertEquals(0, repQueue.underReplicatedQueueSize());
+ Assert.assertEquals(0, repQueue.overReplicatedQueueSize());
+ // Still under replicated until the pending complete
+ Assert.assertEquals(1, report.getStat(
+ ReplicationManagerReport.HealthState.UNDER_REPLICATED));
+ }
+
+ @Test
+ public void testUnderReplicatedDueToDecommission() {
+ ContainerInfo container = createContainerInfo(repConfig);
+ Set<ContainerReplica> replicas = createReplicas(container.containerID(),
+ Pair.of(IN_SERVICE, 0), Pair.of(DECOMMISSIONING, 0),
+ Pair.of(DECOMMISSIONED, 0));
+
+ requestBuilder.setContainerReplicas(replicas)
+ .setContainerInfo(container);
+ UnderReplicatedHealthResult result = (UnderReplicatedHealthResult)
+ healthCheck.checkHealth(requestBuilder.build());
+ Assert.assertEquals(HealthState.UNDER_REPLICATED, result.getHealthState());
+ Assert.assertEquals(2, result.getRemainingRedundancy());
+ Assert.assertFalse(result.isSufficientlyReplicatedAfterPending());
+ Assert.assertTrue(result.underReplicatedDueToDecommission());
+
+ Assert.assertTrue(healthCheck.handle(requestBuilder.build()));
+ Assert.assertEquals(1, repQueue.underReplicatedQueueSize());
+ Assert.assertEquals(0, repQueue.overReplicatedQueueSize());
+ Assert.assertEquals(1, report.getStat(
+ ReplicationManagerReport.HealthState.UNDER_REPLICATED));
+ }
+
+ @Test
+ public void testUnderReplicatedDueToDecommissionFixedWithPending() {
+ ContainerInfo container = createContainerInfo(repConfig);
+ Set<ContainerReplica> replicas = createReplicas(container.containerID(),
+ Pair.of(IN_SERVICE, 0), Pair.of(IN_SERVICE, 0),
+ Pair.of(DECOMMISSIONED, 0));
+ List<ContainerReplicaOp> pending = new ArrayList<>();
+ pending.add(ContainerReplicaOp.create(
+ ADD, MockDatanodeDetails.randomDatanodeDetails(), 0));
+
+ requestBuilder.setContainerReplicas(replicas)
+ .setPendingOps(pending)
+ .setContainerInfo(container);
+ UnderReplicatedHealthResult result = (UnderReplicatedHealthResult)
+ healthCheck.checkHealth(requestBuilder.build());
+ Assert.assertEquals(HealthState.UNDER_REPLICATED, result.getHealthState());
+ Assert.assertEquals(2, result.getRemainingRedundancy());
+ Assert.assertTrue(result.isSufficientlyReplicatedAfterPending());
+ Assert.assertTrue(result.underReplicatedDueToDecommission());
+
+ Assert.assertTrue(healthCheck.handle(requestBuilder.build()));
+ // Nothing queued as inflight replicas will fix it.
+ Assert.assertEquals(0, repQueue.underReplicatedQueueSize());
+ Assert.assertEquals(0, repQueue.overReplicatedQueueSize());
+ // Still under replicated in the report until pending complete
+ Assert.assertEquals(1, report.getStat(
+ ReplicationManagerReport.HealthState.UNDER_REPLICATED));
+ }
+
+ @Test
+ public void testUnderReplicatedDueToDecommissionAndMissing() {
+ ContainerInfo container = createContainerInfo(repConfig);
+ Set<ContainerReplica> replicas = createReplicas(container.containerID(),
+ Pair.of(IN_SERVICE, 0), Pair.of(DECOMMISSIONED, 0));
+ List<ContainerReplicaOp> pending = new ArrayList<>();
+ pending.add(ContainerReplicaOp.create(
+ ADD, MockDatanodeDetails.randomDatanodeDetails(), 0));
+
+ requestBuilder.setContainerReplicas(replicas)
+ .setPendingOps(pending)
+ .setContainerInfo(container);
+ UnderReplicatedHealthResult result = (UnderReplicatedHealthResult)
+ healthCheck.checkHealth(requestBuilder.build());
+ Assert.assertEquals(HealthState.UNDER_REPLICATED, result.getHealthState());
+ Assert.assertEquals(1, result.getRemainingRedundancy());
+ Assert.assertFalse(result.isSufficientlyReplicatedAfterPending());
+ Assert.assertFalse(result.underReplicatedDueToDecommission());
+
+ Assert.assertTrue(healthCheck.handle(requestBuilder.build()));
+ Assert.assertEquals(1, repQueue.underReplicatedQueueSize());
+ Assert.assertEquals(0, repQueue.overReplicatedQueueSize());
+ Assert.assertEquals(1, report.getStat(
+ ReplicationManagerReport.HealthState.UNDER_REPLICATED));
+ }
+
+ @Test
+ public void testUnderReplicatedAndUnrecoverable() {
+ ContainerInfo container = createContainerInfo(repConfig);
+ Set<ContainerReplica> replicas = Collections.EMPTY_SET;
+
+ requestBuilder.setContainerReplicas(replicas)
+ .setContainerInfo(container);
+ UnderReplicatedHealthResult result = (UnderReplicatedHealthResult)
+ healthCheck.checkHealth(requestBuilder.build());
+ Assert.assertEquals(HealthState.UNDER_REPLICATED, result.getHealthState());
+ Assert.assertEquals(0, result.getRemainingRedundancy());
+ Assert.assertFalse(result.isSufficientlyReplicatedAfterPending());
+ Assert.assertFalse(result.underReplicatedDueToDecommission());
+ Assert.assertTrue(result.isUnrecoverable());
+
+ Assert.assertTrue(healthCheck.handle(requestBuilder.build()));
+ // Unrecoverable, so not added to the queue.
+ Assert.assertEquals(0, repQueue.underReplicatedQueueSize());
+ Assert.assertEquals(0, repQueue.overReplicatedQueueSize());
+ Assert.assertEquals(1, report.getStat(
+ ReplicationManagerReport.HealthState.UNDER_REPLICATED));
+ Assert.assertEquals(1, report.getStat(
+ ReplicationManagerReport.HealthState.MISSING));
+ }
+
+ @Test
+ public void testOverReplicatedContainer() {
+ ContainerInfo container = createContainerInfo(repConfig);
+ Set<ContainerReplica> replicas = createReplicas(container.containerID(),
+ Pair.of(IN_SERVICE, 0), Pair.of(IN_SERVICE, 0),
+ Pair.of(IN_SERVICE, 0), Pair.of(IN_SERVICE, 0),
+ Pair.of(IN_SERVICE, 0),
+ Pair.of(IN_SERVICE, 0), Pair.of(IN_SERVICE, 0));
+
+ List<ContainerReplicaOp> pending = new ArrayList<>();
+ pending.add(ContainerReplicaOp.create(
+ DELETE, MockDatanodeDetails.randomDatanodeDetails(), 0));
+ pending.add(ContainerReplicaOp.create(
+ DELETE, MockDatanodeDetails.randomDatanodeDetails(), 0));
+
+ requestBuilder.setContainerReplicas(replicas)
+ .setPendingOps(pending)
+ .setContainerInfo(container);
+ OverReplicatedHealthResult result = (OverReplicatedHealthResult)
+ healthCheck.checkHealth(requestBuilder.build());
+ Assert.assertEquals(HealthState.OVER_REPLICATED, result.getHealthState());
+ Assert.assertEquals(4, result.getExcessRedundancy());
+ Assert.assertFalse(result.isSufficientlyReplicatedAfterPending());
+
+ Assert.assertTrue(healthCheck.handle(requestBuilder.build()));
+ Assert.assertEquals(0, repQueue.underReplicatedQueueSize());
+ Assert.assertEquals(1, repQueue.overReplicatedQueueSize());
+ Assert.assertEquals(1, report.getStat(
+ ReplicationManagerReport.HealthState.OVER_REPLICATED));
+ }
+
+ @Test
+ public void testOverReplicatedContainerFixedByPending() {
+ ContainerInfo container = createContainerInfo(repConfig);
+ Set<ContainerReplica> replicas = createReplicas(container.containerID(),
+ Pair.of(IN_SERVICE, 0), Pair.of(IN_SERVICE, 0),
+ Pair.of(IN_SERVICE, 0), Pair.of(IN_SERVICE, 0));
+
+ List<ContainerReplicaOp> pending = new ArrayList<>();
+ pending.add(ContainerReplicaOp.create(
+ DELETE, MockDatanodeDetails.randomDatanodeDetails(), 0));
+
+ requestBuilder.setContainerReplicas(replicas)
+ .setPendingOps(pending)
+ .setContainerInfo(container);
+ OverReplicatedHealthResult result = (OverReplicatedHealthResult)
+ healthCheck.checkHealth(requestBuilder.build());
+ Assert.assertEquals(HealthState.OVER_REPLICATED, result.getHealthState());
+ Assert.assertEquals(1, result.getExcessRedundancy());
+ Assert.assertTrue(result.isSufficientlyReplicatedAfterPending());
+
+ Assert.assertTrue(healthCheck.handle(requestBuilder.build()));
+ Assert.assertEquals(0, repQueue.underReplicatedQueueSize());
+ // Fixed by pending so nothing queued.
+ Assert.assertEquals(0, repQueue.overReplicatedQueueSize());
+ // Still over replicated, so the report should contain it
+ Assert.assertEquals(1, report.getStat(
+ ReplicationManagerReport.HealthState.OVER_REPLICATED));
+ }
+
+ @Test
+ public void testOverReplicatedContainerWithMaintenance() {
+ ContainerInfo container = createContainerInfo(repConfig);
+ Set<ContainerReplica> replicas = createReplicas(container.containerID(),
+ Pair.of(IN_SERVICE, 0), Pair.of(IN_SERVICE, 0),
+ Pair.of(IN_SERVICE, 0), Pair.of(IN_SERVICE, 0),
+ Pair.of(IN_MAINTENANCE, 0), Pair.of(DECOMMISSIONED, 0));
+
+ requestBuilder.setContainerReplicas(replicas)
+ .setContainerInfo(container);
+ OverReplicatedHealthResult result = (OverReplicatedHealthResult)
+ healthCheck.checkHealth(requestBuilder.build());
+ Assert.assertEquals(HealthState.OVER_REPLICATED, result.getHealthState());
+ Assert.assertEquals(1, result.getExcessRedundancy());
+ Assert.assertFalse(result.isSufficientlyReplicatedAfterPending());
+
+ Assert.assertTrue(healthCheck.handle(requestBuilder.build()));
+ Assert.assertEquals(0, repQueue.underReplicatedQueueSize());
+ Assert.assertEquals(1, repQueue.overReplicatedQueueSize());
+ Assert.assertEquals(1, report.getStat(
+ ReplicationManagerReport.HealthState.OVER_REPLICATED));
+ }
+
+ @Test
+ public void testOverReplicatedContainerDueToMaintenanceIsHealthy() {
+ ContainerInfo container = createContainerInfo(repConfig);
+ Set<ContainerReplica> replicas = createReplicas(container.containerID(),
+ Pair.of(IN_SERVICE, 0), Pair.of(IN_SERVICE, 0),
+ Pair.of(IN_SERVICE, 0), Pair.of(IN_MAINTENANCE, 0),
+ Pair.of(IN_MAINTENANCE, 0));
+
+ requestBuilder.setContainerReplicas(replicas)
+ .setContainerInfo(container);
+ ContainerHealthResult result =
+ healthCheck.checkHealth(requestBuilder.build());
+ Assert.assertEquals(HealthState.HEALTHY, result.getHealthState());
+
+ Assert.assertFalse(healthCheck.handle(requestBuilder.build()));
+ Assert.assertEquals(0, repQueue.underReplicatedQueueSize());
+ Assert.assertEquals(0, repQueue.overReplicatedQueueSize());
+ Assert.assertEquals(0, report.getStat(
+ ReplicationManagerReport.HealthState.OVER_REPLICATED));
+ Assert.assertEquals(0, report.getStat(
+ ReplicationManagerReport.HealthState.UNDER_REPLICATED));
+ }
+
+ @Test
+ public void testUnderReplicatedWithMisReplication() {
+ Mockito.when(containerPlacementPolicy.validateContainerPlacement(
+ Mockito.any(),
+ Mockito.anyInt()
+ )).thenAnswer(invocation ->
+ new ContainerPlacementStatusDefault(1, 2, 3));
+
+ ContainerInfo container = createContainerInfo(repConfig);
+ Set<ContainerReplica> replicas
+ = createReplicas(container.containerID(), 0, 0);
+ requestBuilder.setContainerReplicas(replicas)
+ .setContainerInfo(container);
+ UnderReplicatedHealthResult result = (UnderReplicatedHealthResult)
+ healthCheck.checkHealth(requestBuilder.build());
+ Assert.assertEquals(HealthState.UNDER_REPLICATED, result.getHealthState());
+ Assert.assertEquals(1, result.getRemainingRedundancy());
+ Assert.assertFalse(result.isSufficientlyReplicatedAfterPending());
+ Assert.assertFalse(result.underReplicatedDueToDecommission());
+ Assert.assertTrue(result.isMisReplicated());
+ Assert.assertTrue(result.isMisReplicatedAfterPending());
+ Assert.assertFalse(result.isDueToMisReplication());
+
+ Assert.assertTrue(healthCheck.handle(requestBuilder.build()));
+ Assert.assertEquals(1, repQueue.underReplicatedQueueSize());
+ Assert.assertEquals(0, repQueue.overReplicatedQueueSize());
+ Assert.assertEquals(1, report.getStat(
+ ReplicationManagerReport.HealthState.UNDER_REPLICATED));
+ Assert.assertEquals(1, report.getStat(
+ ReplicationManagerReport.HealthState.MIS_REPLICATED));
+ }
+
+ @Test
+ public void testUnderReplicatedWithMisReplicationFixedByPending() {
+ Mockito.when(containerPlacementPolicy.validateContainerPlacement(
+ Mockito.any(),
+ Mockito.anyInt()
+ )).thenAnswer(invocation -> {
+ List<DatanodeDetails> dns = invocation.getArgument(0);
+ // If the number of DNs is 3 or less make it be mis-replicated
+ if (dns.size() <= 3) {
+ return new ContainerPlacementStatusDefault(1, 2, 3);
+ } else {
+ return new ContainerPlacementStatusDefault(2, 2, 3);
+ }
+ });
+
+ ContainerInfo container = createContainerInfo(repConfig);
+ Set<ContainerReplica> replicas
+ = createReplicas(container.containerID(), 0, 0);
+
+ List<ContainerReplicaOp> pending = new ArrayList<>();
+ pending.add(ContainerReplicaOp.create(
+ ADD, MockDatanodeDetails.randomDatanodeDetails(), 0));
+ pending.add(ContainerReplicaOp.create(
+ ADD, MockDatanodeDetails.randomDatanodeDetails(), 0));
+
+ requestBuilder.setContainerReplicas(replicas)
+ .setContainerInfo(container)
+ .setPendingOps(pending);
+ UnderReplicatedHealthResult result = (UnderReplicatedHealthResult)
+ healthCheck.checkHealth(requestBuilder.build());
+ Assert.assertEquals(HealthState.UNDER_REPLICATED, result.getHealthState());
+ Assert.assertEquals(1, result.getRemainingRedundancy());
+ Assert.assertTrue(result.isSufficientlyReplicatedAfterPending());
+ Assert.assertFalse(result.underReplicatedDueToDecommission());
+ Assert.assertTrue(result.isMisReplicated());
+ Assert.assertFalse(result.isMisReplicatedAfterPending());
+ Assert.assertFalse(result.isDueToMisReplication());
+
+ Assert.assertTrue(healthCheck.handle(requestBuilder.build()));
+ Assert.assertEquals(0, repQueue.underReplicatedQueueSize());
+ Assert.assertEquals(0, repQueue.overReplicatedQueueSize());
+ Assert.assertEquals(1, report.getStat(
+ ReplicationManagerReport.HealthState.UNDER_REPLICATED));
+ Assert.assertEquals(1, report.getStat(
+ ReplicationManagerReport.HealthState.MIS_REPLICATED));
+ }
+
+ @Test
+ public void testUnderReplicatedOnlyDueToMisReplication() {
+ Mockito.when(containerPlacementPolicy.validateContainerPlacement(
+ Mockito.any(),
+ Mockito.anyInt()
+ )).thenAnswer(invocation ->
+ new ContainerPlacementStatusDefault(1, 2, 3));
+
+ ContainerInfo container = createContainerInfo(repConfig);
+ Set<ContainerReplica> replicas
+ = createReplicas(container.containerID(), 0, 0, 0);
+ requestBuilder.setContainerReplicas(replicas)
+ .setContainerInfo(container);
+ UnderReplicatedHealthResult result = (UnderReplicatedHealthResult)
+ healthCheck.checkHealth(requestBuilder.build());
+ Assert.assertEquals(HealthState.UNDER_REPLICATED, result.getHealthState());
+ Assert.assertEquals(2, result.getRemainingRedundancy());
+ Assert.assertTrue(result.isSufficientlyReplicatedAfterPending());
+ Assert.assertFalse(result.underReplicatedDueToDecommission());
+ Assert.assertTrue(result.isMisReplicated());
+ Assert.assertTrue(result.isMisReplicatedAfterPending());
+ Assert.assertTrue(result.isDueToMisReplication());
+
+ Assert.assertTrue(healthCheck.handle(requestBuilder.build()));
+ Assert.assertEquals(1, repQueue.underReplicatedQueueSize());
+ Assert.assertEquals(0, repQueue.overReplicatedQueueSize());
+ Assert.assertEquals(1, report.getStat(
+ ReplicationManagerReport.HealthState.UNDER_REPLICATED));
+ Assert.assertEquals(1, report.getStat(
+ ReplicationManagerReport.HealthState.MIS_REPLICATED));
+ }
+
+ @Test
+ public void testUnderReplicatedOnlyDueToMisReplicationFixByPending() {
+ Mockito.when(containerPlacementPolicy.validateContainerPlacement(
+ Mockito.any(),
+ Mockito.anyInt()
+ )).thenAnswer(invocation -> {
+ List<DatanodeDetails> dns = invocation.getArgument(0);
+ // If the number of DNs is 3 or less make it be mis-replicated
+ if (dns.size() <= 3) {
+ return new ContainerPlacementStatusDefault(1, 2, 3);
+ } else {
+ return new ContainerPlacementStatusDefault(2, 2, 3);
+ }
+ });
+
+ ContainerInfo container = createContainerInfo(repConfig);
+ Set<ContainerReplica> replicas
+ = createReplicas(container.containerID(), 0, 0, 0);
+
+ List<ContainerReplicaOp> pending = new ArrayList<>();
+ pending.add(ContainerReplicaOp.create(
+ ADD, MockDatanodeDetails.randomDatanodeDetails(), 0));
+ pending.add(ContainerReplicaOp.create(
+ ADD, MockDatanodeDetails.randomDatanodeDetails(), 0));
+
+ requestBuilder.setContainerReplicas(replicas)
+ .setContainerInfo(container)
+ .setPendingOps(pending);
+ UnderReplicatedHealthResult result = (UnderReplicatedHealthResult)
+ healthCheck.checkHealth(requestBuilder.build());
+ Assert.assertEquals(HealthState.UNDER_REPLICATED, result.getHealthState());
+ Assert.assertEquals(2, result.getRemainingRedundancy());
+ Assert.assertTrue(result.isSufficientlyReplicatedAfterPending());
+ Assert.assertFalse(result.underReplicatedDueToDecommission());
+ Assert.assertTrue(result.isMisReplicated());
+ Assert.assertFalse(result.isMisReplicatedAfterPending());
+ Assert.assertTrue(result.isDueToMisReplication());
+
+ Assert.assertTrue(healthCheck.handle(requestBuilder.build()));
+ Assert.assertEquals(0, repQueue.underReplicatedQueueSize());
+ Assert.assertEquals(0, repQueue.overReplicatedQueueSize());
+ Assert.assertEquals(1, report.getStat(
+ ReplicationManagerReport.HealthState.UNDER_REPLICATED));
+ Assert.assertEquals(1, report.getStat(
+ ReplicationManagerReport.HealthState.MIS_REPLICATED));
+ }
+
+}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDatanodeAdminMonitor.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDatanodeAdminMonitor.java
index 8afe2a1810..095d137a5a 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDatanodeAdminMonitor.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDatanodeAdminMonitor.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
import org.apache.hadoop.hdds.scm.container.ContainerID;
-import org.apache.hadoop.hdds.scm.container.RatisContainerReplicaCount;
+import org.apache.hadoop.hdds.scm.container.replication.RatisContainerReplicaCount;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org