You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by el...@apache.org on 2019/11/28 08:38:06 UTC
[hadoop-ozone] branch HDDS-1880-Decom updated: HDDS-2459. Refactor
ReplicationManager to consider maintenance states
This is an automated email from the ASF dual-hosted git repository.
elek pushed a commit to branch HDDS-1880-Decom
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git
The following commit(s) were added to refs/heads/HDDS-1880-Decom by this push:
new 920b8c5 HDDS-2459. Refactor ReplicationManager to consider maintenance states
920b8c5 is described below
commit 920b8c5eddc6f25138692636a66270ee22c8dfa4
Author: S O'Donnell <so...@cloudera.com>
AuthorDate: Thu Nov 28 09:32:13 2019 +0100
HDDS-2459. Refactor ReplicationManager to consider maintenance states
Closes #262
---
.../proto/StorageContainerDatanodeProtocol.proto | 2 +
.../hdds/scm/container/ContainerReplicaCount.java | 241 ++++++++++++
.../hdds/scm/container/ReplicationManager.java | 224 +++++++----
.../apache/hadoop/hdds/scm/node/NodeStatus.java | 69 ++++
.../hdds/scm/server/StorageContainerManager.java | 3 +-
.../hdds/scm/container/TestReplicationManager.java | 425 ++++++++++++++++++++-
.../TestSCMContainerPlacementRackAware.java | 6 -
.../states/TestContainerReplicaCount.java | 333 ++++++++++++++++
.../hdds/scm/node/TestDatanodeAdminMonitor.java | 3 +
.../hdds/scm/safemode/TestSafeModeHandler.java | 6 +-
.../scm/node/TestDecommissionAndMaintenance.java | 1 -
11 files changed, 1229 insertions(+), 84 deletions(-)
diff --git a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
index 45a1db6..6c30ba1 100644
--- a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
+++ b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
@@ -185,6 +185,8 @@ message ContainerReplicaProto {
CLOSED = 4;
UNHEALTHY = 5;
INVALID = 6;
+ DECOMMISSIONED = 7;
+ MAINTENANCE = 8;
}
required int64 containerID = 1;
required State state = 2;
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplicaCount.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplicaCount.java
new file mode 100644
index 0000000..a7ea56d
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplicaCount.java
@@ -0,0 +1,241 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.container;
+
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
+import java.util.Set;
+
+/**
+ * Immutable object that is created with a set of ContainerReplica objects and
+ * the number of in flight replica add and deletes, the container replication
+ * factor and the min count which must be available for maintenance. This
+ * information can be used to determine if the container is over or under
+ * replicated and also how many additional replicas need created or removed.
+ */
+public class ContainerReplicaCount {
+
+ private int healthyCount = 0;
+ private int decommissionCount = 0;
+ private int maintenanceCount = 0;
+ private int inFlightAdd = 0;
+ private int inFlightDel = 0;
+ private int repFactor;
+ private int minHealthyForMaintenance;
+ private Set<ContainerReplica> replica;
+
+ public ContainerReplicaCount(Set<ContainerReplica> replica, int inFlightAdd,
+ int inFlightDelete, int replicationFactor,
+ int minHealthyForMaintenance) {
+ this.healthyCount = 0;
+ this.decommissionCount = 0;
+ this.maintenanceCount = 0;
+ this.inFlightAdd = inFlightAdd;
+ this.inFlightDel = inFlightDelete;
+ this.repFactor = replicationFactor;
+ this.replica = replica;
+ this.minHealthyForMaintenance
+ = Math.min(this.repFactor, minHealthyForMaintenance);
+
+ for (ContainerReplica cr : this.replica) {
+ ContainerReplicaProto.State state = cr.getState();
+ if (state == ContainerReplicaProto.State.DECOMMISSIONED) {
+ decommissionCount++;
+ } else if (state == ContainerReplicaProto.State.MAINTENANCE) {
+ maintenanceCount++;
+ } else {
+ healthyCount++;
+ }
+ }
+ }
+
+ public int getHealthyCount() {
+ return healthyCount;
+ }
+
+ public int getDecommissionCount() {
+ return decommissionCount;
+ }
+
+ public int getMaintenanceCount() {
+ return maintenanceCount;
+ }
+
+ public int getReplicationFactor() {
+ return repFactor;
+ }
+
+ public Set<ContainerReplica> getReplica() {
+ return replica;
+ }
+
+ @Override
+ public String toString() {
+ return "Replica Count: "+replica.size()+
+ " Healthy Count: "+healthyCount+
+ " Decommission Count: "+decommissionCount+
+ " Maintenance Count: "+maintenanceCount+
+ " inFlightAdd Count: "+inFlightAdd+
+ " inFightDel Count: "+inFlightDel+
+ " ReplicationFactor: "+repFactor+
+ " minMaintenance Count: "+minHealthyForMaintenance;
+ }
+
+ /**
+ * Calculates the the delta of replicas which need to be created or removed
+ * to ensure the container is correctly replicated when considered inflight
+ * adds and deletes.
+ *
+ * When considering inflight operations, it is assumed any operation will
+ * fail. However, to consider the worst case and avoid data loss, we always
+ * assume a delete will succeed and and add will fail. In this way, we will
+ * avoid scheduling too many deletes which could result in dataloss.
+ *
+ * Decisions around over-replication are made only on healthy replicas,
+ * ignoring any in maintenance and also any inflight adds. InFlight adds are
+ * ignored, as they may not complete, so if we have:
+ *
+ * H, H, H, IN_FLIGHT_ADD
+ *
+ * And then schedule a delete, we could end up under-replicated (add fails,
+ * delete completes). It is better to let the inflight operations complete
+ * and then deal with any further over or under replication.
+ *
+ * For maintenance replicas, assuming replication factor 3, and minHealthy
+ * 2, it is possible for all 3 hosts to be put into maintenance, leaving the
+ * following (H = healthy, M = maintenance):
+ *
+ * H, H, M, M, M
+ *
+ * Even though we are tracking 5 replicas, this is not over replicated as we
+ * ignore the maintenance copies. Later, the replicas could look like:
+ *
+ * H, H, H, H, M
+ *
+ * At this stage, the container is over replicated by 1, so one replica can be
+ * removed.
+ *
+ * For containers which have replication factor healthy replica, we ignore any
+ * inflight add or deletes, as they may fail. Instead, wait for them to
+ * complete and then deal with any excess or deficit.
+ *
+ * For under replicated containers we do consider inflight add and delete to
+ * avoid scheduling more adds than needed. There is additional logic around
+ * containers with maintenance replica to ensure minHealthyForMaintenance
+ * replia are maintained.
+ *
+ * @return Delta of replicas needed. Negative indicates over replication and
+ * containers should be removed. Positive indicates over replication
+ * and zero indicates the containers has replicationFactor healthy
+ * replica
+ */
+ public int additionalReplicaNeeded() {
+ int delta = missingReplicas();
+
+ if (delta < 0) {
+ // Over replicated, so may need to remove a container. Do not consider
+ // inFlightAdds, as they may fail, but do consider inFlightDel which
+ // will reduce the over-replication if it completes.
+ // Note this could make the delta positive if there are too many in flight
+ // deletes, which will result in an additional being scheduled.
+ return delta + inFlightDel;
+ } else {
+ // May be under or perfectly replicated.
+ // We must consider in flight add and delete when calculating the new
+ // containers needed, but we bound the lower limit at zero to allow
+ // inflight operations to complete before handling any potential over
+ // replication
+ return Math.max(0, delta - inFlightAdd + inFlightDel);
+ }
+ }
+
+ /**
+ * Returns the count of replicas which need to be created or removed to
+ * ensure the container is perfectly replicate. Inflight operations are not
+ * considered here, but the logic to determine the missing or excess counts
+ * for maintenance is present.
+ *
+ * Decisions around over-replication are made only on healthy replicas,
+ * ignoring any in maintenance. For example, if we have:
+ *
+ * H, H, H, M, M
+ *
+ * This will not be consider over replicated until one of the Maintenance
+ * replicas moves to Healthy.
+ *
+ * If the container is perfectly replicated, zero will be return.
+ *
+ * If it is under replicated a positive value will be returned, indicating
+ * how many replicas must be added.
+ *
+ * If it is over replicated a negative value will be returned, indicating now
+ * many replicas to remove.
+ *
+ * @return Zero if the container is perfectly replicated, a positive value
+ * for under replicated and a negative value for over replicated.
+ */
+ private int missingReplicas() {
+ int delta = repFactor - healthyCount;
+
+ if (delta < 0) {
+ // Over replicated, so may need to remove a container.
+ return delta;
+ } else if (delta > 0) {
+ // May be under-replicated, depending on maintenance.
+ delta = Math.max(0, delta - maintenanceCount);
+ int neededHealthy =
+ Math.max(0, minHealthyForMaintenance - healthyCount);
+ delta = Math.max(neededHealthy, delta);
+ return delta;
+ } else { // delta == 0
+ // We have exactly the number of healthy replicas needed.
+ return delta;
+ }
+ }
+
+ /**
+ * Return true if the container is sufficiently replicated. Decommissioning
+ * and Decommissioned containers are ignored in this check, assuming they will
+ * eventually be removed from the cluster.
+ * This check ignores inflight additions, as those replicas have not yet been
+ * created and the create could fail for some reason.
+ * The check does consider inflight deletes as there may be 3 healthy replicas
+ * now, but once the delete completes it will reduce to 2.
+ * We also assume a replica in Maintenance state cannot be removed, so the
+ * pending delete would affect only the healthy replica count.
+ *
+ * @return True if the container is sufficiently replicated and False
+ * otherwise.
+ */
+ public boolean isSufficientlyReplicated() {
+ return missingReplicas() + inFlightDel <= 0;
+ }
+
+ /**
+ * Return true is the container is over replicated. Decommission and
+ * maintenance containers are ignored for this check.
+ * The check ignores inflight additions, as they may fail, but it does
+ * consider inflight deletes, as they would reduce the over replication when
+ * they complete.
+ *
+ * @return True if the container is over replicated, false otherwise.
+ */
+ public boolean isOverReplicated() {
+ return missingReplicas() + inFlightDel < 0;
+ }
+}
\ No newline at end of file
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
index 37afd36..58e38a2 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State;
import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicy;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.metrics2.MetricsCollector;
import org.apache.hadoop.metrics2.MetricsInfo;
@@ -98,6 +99,11 @@ public class ReplicationManager implements MetricsSource {
private final LockManager<ContainerID> lockManager;
/**
+ * Used to lookup the health of a nodes or the nodes operational state.
+ */
+ private final NodeManager nodeManager;
+
+ /**
* This is used for tracking container replication commands which are issued
* by ReplicationManager and not yet complete.
*/
@@ -127,6 +133,11 @@ public class ReplicationManager implements MetricsSource {
private volatile boolean running;
/**
+ * Minimum number of replica in a healthy state for maintenance.
+ */
+ private int minHealthyForMaintenance;
+
+ /**
* Constructs ReplicationManager instance with the given configuration.
*
* @param conf OzoneConfiguration
@@ -138,15 +149,18 @@ public class ReplicationManager implements MetricsSource {
final ContainerManager containerManager,
final ContainerPlacementPolicy containerPlacement,
final EventPublisher eventPublisher,
- final LockManager<ContainerID> lockManager) {
+ final LockManager<ContainerID> lockManager,
+ final NodeManager nodeManager) {
this.containerManager = containerManager;
this.containerPlacement = containerPlacement;
this.eventPublisher = eventPublisher;
this.lockManager = lockManager;
+ this.nodeManager = nodeManager;
this.conf = conf;
this.running = false;
this.inflightReplication = new ConcurrentHashMap<>();
this.inflightDeletion = new ConcurrentHashMap<>();
+ this.minHealthyForMaintenance = conf.getMaintenanceReplicaMinimum();
}
/**
@@ -241,7 +255,7 @@ public class ReplicationManager implements MetricsSource {
* @param id ContainerID
*/
private void processContainer(ContainerID id) {
- lockManager.lock(id);
+ lockManager.writeLock(id);
try {
final ContainerInfo container = containerManager.getContainer(id);
final Set<ContainerReplica> replicas = containerManager
@@ -291,24 +305,15 @@ public class ReplicationManager implements MetricsSource {
action -> replicas.stream()
.noneMatch(r -> r.getDatanodeDetails().equals(action.datanode)));
-
- /*
- * We don't have to take any action if the container is healthy.
- *
- * According to ReplicationMonitor container is considered healthy if
- * the container is either in QUASI_CLOSED or in CLOSED state and has
- * exact number of replicas in the same state.
- */
- if (isContainerHealthy(container, replicas)) {
- return;
- }
+ ContainerReplicaCount replicaSet =
+ getContainerReplicaCount(container, replicas);
/*
* Check if the container is under replicated and take appropriate
* action.
*/
- if (isContainerUnderReplicated(container, replicas)) {
- handleUnderReplicatedContainer(container, replicas);
+ if (!replicaSet.isSufficientlyReplicated()) {
+ handleUnderReplicatedContainer(container, replicaSet);
return;
}
@@ -316,22 +321,24 @@ public class ReplicationManager implements MetricsSource {
* Check if the container is over replicated and take appropriate
* action.
*/
- if (isContainerOverReplicated(container, replicas)) {
- handleOverReplicatedContainer(container, replicas);
+ if (replicaSet.isOverReplicated()) {
+ handleOverReplicatedContainer(container, replicaSet);
return;
}
/*
- * The container is neither under nor over replicated and the container
- * is not healthy. This means that the container has unhealthy/corrupted
- * replica.
+ If we get here, the container is not over replicated or under replicated
+ but it may be "unhealthy", which means it has one or more replica which
+ are not in the same state as the container itself.
*/
- handleUnstableContainer(container, replicas);
+ if (!isContainerHealthy(container, replicas)) {
+ handleUnstableContainer(container, replicas);
+ }
} catch (ContainerNotFoundException ex) {
LOG.warn("Missing container {}.", id);
} finally {
- lockManager.unlock(id);
+ lockManager.writeUnlock(id);
}
}
@@ -361,7 +368,8 @@ public class ReplicationManager implements MetricsSource {
* Returns true if the container is healthy according to ReplicationMonitor.
*
* According to ReplicationMonitor container is considered healthy if
- * it has exact number of replicas in the same state as the container.
+ * all replica which are not in a decommission or maintenance state are in
+ * the same state as the container and in QUASI_CLOSED or in CLOSED state.
*
* @param container Container to check
* @param replicas Set of ContainerReplicas
@@ -369,50 +377,76 @@ public class ReplicationManager implements MetricsSource {
*/
private boolean isContainerHealthy(final ContainerInfo container,
final Set<ContainerReplica> replicas) {
- return container.getReplicationFactor().getNumber() == replicas.size() &&
- replicas.stream().allMatch(
- r -> compareState(container.getState(), r.getState()));
+ return (container.getState() == LifeCycleState.CLOSED
+ || container.getState() == LifeCycleState.QUASI_CLOSED)
+ && replicas.stream()
+ .filter(r -> r.getState() != State.DECOMMISSIONED)
+ .filter(r -> r.getState() != State.MAINTENANCE)
+ .allMatch(r -> compareState(container.getState(), r.getState()));
}
/**
- * Checks if the container is under replicated or not.
- *
- * @param container Container to check
- * @param replicas Set of ContainerReplicas
- * @return true if the container is under replicated, false otherwise
+ * Returns the number replica which are pending creation for the given
+ * container ID.
+ * @param id The ContainerID for which to check the pending replica
+ * @return The number of inflight additions or zero if none
*/
- private boolean isContainerUnderReplicated(final ContainerInfo container,
- final Set<ContainerReplica> replicas) {
- return container.getReplicationFactor().getNumber() >
- getReplicaCount(container.containerID(), replicas);
+ private int getInflightAdd(final ContainerID id) {
+ return inflightReplication.getOrDefault(id, Collections.emptyList()).size();
}
/**
- * Checks if the container is over replicated or not.
- *
- * @param container Container to check
- * @param replicas Set of ContainerReplicas
- * @return true if the container if over replicated, false otherwise
+ * Returns the number replica which are pending delete for the given
+ * container ID.
+ * @param id The ContainerID for which to check the pending replica
+ * @return The number of inflight deletes or zero if none
*/
- private boolean isContainerOverReplicated(final ContainerInfo container,
- final Set<ContainerReplica> replicas) {
- return container.getReplicationFactor().getNumber() <
- getReplicaCount(container.containerID(), replicas);
+ private int getInflightDel(final ContainerID id) {
+ return inflightDeletion.getOrDefault(id, Collections.emptyList()).size();
}
/**
- * Returns the replication count of the given container. This also
- * considers inflight replication and deletion.
+ * Given a container, obtain the set of known replica for it, and return a
+ * ContainerReplicaCount object. This object will contain the set of replica
+ * as well as all information required to determine if the container is over
+ * or under replicated, including the delta of replica required to repair the
+ * over or under replication.
*
- * @param id ContainerID
- * @param replicas Set of existing replicas
- * @return number of estimated replicas for this container
- */
- private int getReplicaCount(final ContainerID id,
- final Set<ContainerReplica> replicas) {
- return replicas.size()
- + inflightReplication.getOrDefault(id, Collections.emptyList()).size()
- - inflightDeletion.getOrDefault(id, Collections.emptyList()).size();
+ * @param container The container to create a ContainerReplicaCount for
+ * @return ContainerReplicaCount representing the replicated state of the
+ * container.
+ * @throws ContainerNotFoundException
+ */
+ public ContainerReplicaCount getContainerReplicaCount(ContainerInfo container)
+ throws ContainerNotFoundException {
+ lockManager.readLock(container.containerID());
+ try {
+ final Set<ContainerReplica> replica = containerManager
+ .getContainerReplicas(container.containerID());
+ return getContainerReplicaCount(container, replica);
+ } finally {
+ lockManager.readUnlock(container.containerID());
+ }
+ }
+
+ /**
+ * Given a container and its set of replicas, create and return a
+ * ContainerReplicaCount representing the container.
+ *
+ * @param container The container for which to construct a
+ * ContainerReplicaCount
+ * @param replica The set of existing replica for this container
+ * @return ContainerReplicaCount representing the current state of the
+ * container
+ */
+ private ContainerReplicaCount getContainerReplicaCount(
+ ContainerInfo container, Set<ContainerReplica> replica) {
+ return new ContainerReplicaCount(
+ replica,
+ getInflightAdd(container.containerID()),
+ getInflightDel(container.containerID()),
+ container.getReplicationFactor().getNumber(),
+ minHealthyForMaintenance);
}
/**
@@ -478,13 +512,29 @@ public class ReplicationManager implements MetricsSource {
* and send replicate container command to the identified datanode(s).
*
* @param container ContainerInfo
- * @param replicas Set of ContainerReplicas
+ * @param replicaSet An instance of ContainerReplicaCount, containing the
+ * current replica count and inflight adds and deletes
*/
private void handleUnderReplicatedContainer(final ContainerInfo container,
- final Set<ContainerReplica> replicas) {
- LOG.debug("Handling underreplicated container: {}",
+ final ContainerReplicaCount replicaSet) {
+ LOG.debug("Handling under replicated container: {}",
container.getContainerID());
+ Set<ContainerReplica> replicas = replicaSet.getReplica();
try {
+
+ if (replicaSet.isSufficientlyReplicated()) {
+ LOG.info("The container {} with replicas {} is sufficiently "+
+ "replicated", container.getContainerID(), replicaSet);
+ return;
+ }
+ int repDelta = replicaSet.additionalReplicaNeeded();
+ if (repDelta <= 0) {
+ LOG.info("The container {} with {} is not sufficiently " +
+ "replicated but no further replicas will be scheduled until "+
+ "in-flight operations complete",
+ container.getContainerID(), replicaSet);
+ return;
+ }
final ContainerID id = container.containerID();
final List<DatanodeDetails> deletionInFlight = inflightDeletion
.getOrDefault(id, Collections.emptyList())
@@ -494,15 +544,19 @@ public class ReplicationManager implements MetricsSource {
final List<DatanodeDetails> source = replicas.stream()
.filter(r ->
r.getState() == State.QUASI_CLOSED ||
- r.getState() == State.CLOSED)
+ r.getState() == State.CLOSED ||
+ r.getState() == State.DECOMMISSIONED ||
+ r.getState() == State.MAINTENANCE)
+ // Exclude stale and dead nodes. This is particularly important for
+ // maintenance nodes, as the replicas will remain present in the
+ // container manager, even when they go dead.
+ .filter(r ->
+ nodeManager.getNodeStatus(r.getDatanodeDetails()).isHealthy())
.filter(r -> !deletionInFlight.contains(r.getDatanodeDetails()))
.sorted((r1, r2) -> r2.getSequenceId().compareTo(r1.getSequenceId()))
.map(ContainerReplica::getDatanodeDetails)
.collect(Collectors.toList());
if (source.size() > 0) {
- final int replicationFactor = container
- .getReplicationFactor().getNumber();
- final int delta = replicationFactor - getReplicaCount(id, replicas);
final List<DatanodeDetails> excludeList = replicas.stream()
.map(ContainerReplica::getDatanodeDetails)
.collect(Collectors.toList());
@@ -511,13 +565,14 @@ public class ReplicationManager implements MetricsSource {
actionList.stream().map(r -> r.datanode)
.forEach(excludeList::add);
}
+ // At this point we have all live source nodes and we have consider
final List<DatanodeDetails> selectedDatanodes = containerPlacement
- .chooseDatanodes(excludeList, null, delta,
+ .chooseDatanodes(excludeList, null, repDelta,
container.getUsedBytes());
LOG.info("Container {} is under replicated. Expected replica count" +
- " is {}, but found {}.", id, replicationFactor,
- replicationFactor - delta);
+ " is {}, but found {}. An additional {} replica are needed",
+ id, replicaSet.getReplicationFactor(), replicaSet, repDelta);
for (DatanodeDetails datanode : selectedDatanodes) {
sendReplicateCommand(container, datanode, source);
@@ -538,17 +593,16 @@ public class ReplicationManager implements MetricsSource {
* identified datanode(s).
*
* @param container ContainerInfo
- * @param replicas Set of ContainerReplicas
+ * @param replicaSet An instance of ContainerReplicaCount, containing the
+ * current replica count and inflight adds and deletes
*/
private void handleOverReplicatedContainer(final ContainerInfo container,
- final Set<ContainerReplica> replicas) {
+ final ContainerReplicaCount replicaSet) {
+ final Set<ContainerReplica> replicas = replicaSet.getReplica();
final ContainerID id = container.containerID();
final int replicationFactor = container.getReplicationFactor().getNumber();
- // Dont consider inflight replication while calculating excess here.
- final int excess = replicas.size() - replicationFactor -
- inflightDeletion.getOrDefault(id, Collections.emptyList()).size();
-
+ final int excess = replicaSet.additionalReplicaNeeded() * -1;
if (excess > 0) {
LOG.info("Container {} is over replicated. Expected replica count" +
@@ -566,6 +620,11 @@ public class ReplicationManager implements MetricsSource {
// Retain one healthy replica per origin node Id.
final List<ContainerReplica> eligibleReplicas = new ArrayList<>(replicas);
eligibleReplicas.removeAll(uniqueReplicas.values());
+ // Replica which are maintenance or decommissioned are not eligible to
+ // be removed, as they do not count toward over-replication and they also
+ // many not be available
+ eligibleReplicas.removeIf(r -> (r.getState() == State.MAINTENANCE
+ || r.getState() == State.DECOMMISSIONED));
final List<ContainerReplica> unhealthyReplicas = eligibleReplicas
.stream()
@@ -801,6 +860,12 @@ public class ReplicationManager implements MetricsSource {
*/
private long eventTimeout = 10 * 60 * 1000;
+ /**
+ * The number of container replica which must be available for a node to
+ * enter maintenance.
+ */
+ private int maintenanceReplicaMinimum = 2;
+
@Config(key = "thread.interval",
type = ConfigType.TIME,
defaultValue = "300s",
@@ -825,6 +890,19 @@ public class ReplicationManager implements MetricsSource {
this.eventTimeout = eventTimeout;
}
+ @Config(key = "maintenance.replica.minimum",
+ type = ConfigType.INT,
+ defaultValue = "2",
+ tags = {SCM, OZONE},
+ description = "The minimum number of container replicas which must " +
+ " be available for a node to enter maintenance. If putting a " +
+ " node into maintenance reduces the available replicas for any " +
+ " container below this level, the node will remain in the " +
+ " entering maintenance state until a new replica is created.")
+ public void setMaintenanceReplicaMinimum(int replicaCount) {
+ this.maintenanceReplicaMinimum = replicaCount;
+ }
+
public long getInterval() {
return interval;
}
@@ -832,6 +910,10 @@ public class ReplicationManager implements MetricsSource {
public long getEventTimeout() {
return eventTimeout;
}
+
+ public int getMaintenanceReplicaMinimum() {
+ return maintenanceReplicaMinimum;
+ }
}
/**
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStatus.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStatus.java
index 0776c28..5c1adf7 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStatus.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStatus.java
@@ -61,6 +61,75 @@ public class NodeStatus {
return operationalState;
}
+ /**
+ * Returns true if the nodeStatus indicates the node is in any decommission
+ * state.
+ *
+ * @return True if the node is in any decommission state, false otherwise
+ */
+ public boolean isDecommission() {
+ return operationalState == HddsProtos.NodeOperationalState.DECOMMISSIONING
+ || operationalState == HddsProtos.NodeOperationalState.DECOMMISSIONED;
+ }
+
+ /**
+ * Returns true if the node is currently decommissioning.
+ *
+ * @return True if the node is decommissioning, false otherwise
+ */
+ public boolean isDecommissioning() {
+ return operationalState == HddsProtos.NodeOperationalState.DECOMMISSIONING;
+ }
+
+ /**
+ * Returns true if the node is decommissioned.
+ *
+ * @return True if the node is decommissioned, false otherwise
+ */
+ public boolean isDecommissioned() {
+ return operationalState == HddsProtos.NodeOperationalState.DECOMMISSIONED;
+ }
+
+ /**
+ * Returns true if the node is in any maintenance state.
+ *
+ * @return True if the node is in any maintenance state, false otherwise
+ */
+ public boolean isMaintenance() {
+ return operationalState
+ == HddsProtos.NodeOperationalState.ENTERING_MAINTENANCE
+ || operationalState == HddsProtos.NodeOperationalState.IN_MAINTENANCE;
+ }
+
+ /**
+ * Returns true if the node is currently entering maintenance.
+ *
+ * @return True if the node is entering maintenance, false otherwise
+ */
+ public boolean isEnteringMaintenance() {
+ return operationalState
+ == HddsProtos.NodeOperationalState.ENTERING_MAINTENANCE;
+ }
+
+ /**
+ * Returns true if the node is currently in maintenance.
+ *
+ * @return True if the node is in maintenance, false otherwise.
+ */
+ public boolean isInMaintenance() {
+ return operationalState == HddsProtos.NodeOperationalState.IN_MAINTENANCE;
+ }
+
+ /**
+ * Returns true if the nodeStatus is healthy (ie not stale or dead) and false
+ * otherwise.
+ *
+ * @return True if the node is Healthy, false otherwise
+ */
+ public boolean isHealthy() {
+ return health == HddsProtos.NodeState.HEALTHY;
+ }
+
@Override
public boolean equals(Object obj) {
if (this == obj) {
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 4df38a3..d285e19 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -434,7 +434,8 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
containerManager,
containerPlacementPolicy,
eventQueue,
- new LockManager<>(conf));
+ new LockManager<>(conf),
+ scmNodeManager);
}
if(configurator.getScmSafeModeManager() != null) {
scmSafeModeManager = configurator.getScmSafeModeManager();
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
index 1631447..158b1bd 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
@@ -21,21 +21,36 @@ package org.apache.hadoop.hdds.scm.container;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
-import org.apache.hadoop.hdds.scm.container.ReplicationManager.ReplicationManagerConfiguration;
+import org.apache.hadoop.hdds.scm.container.ReplicationManager
+ .ReplicationManagerConfiguration;
import org.apache.hadoop.hdds.scm.container.placement.algorithms
.ContainerPlacementPolicy;
+import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
+import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.node.DatanodeInfo;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.node.NodeStatus;
+import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.ozone.lock.LockManager;
+import org.apache.hadoop.ozone.protocol.VersionResponse;
import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
+import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -69,12 +84,16 @@ public class TestReplicationManager {
private ContainerPlacementPolicy containerPlacementPolicy;
private EventQueue eventQueue;
private DatanodeCommandHandler datanodeCommandHandler;
+ private SimpleNodeManager nodeManager;
+ private ContainerManager containerManager;
+ private Configuration conf;
@Before
public void setup() throws IOException, InterruptedException {
- final Configuration conf = new OzoneConfiguration();
- final ContainerManager containerManager =
+ conf = new OzoneConfiguration();
+ containerManager =
Mockito.mock(ContainerManager.class);
+ nodeManager = new SimpleNodeManager();
eventQueue = new EventQueue();
containerStateManager = new ContainerStateManager(conf);
@@ -106,12 +125,27 @@ public class TestReplicationManager {
.collect(Collectors.toList());
});
+ createReplicationManager(new ReplicationManagerConfiguration());
replicationManager = new ReplicationManager(
new ReplicationManagerConfiguration(),
containerManager,
containerPlacementPolicy,
eventQueue,
- new LockManager<>(conf));
+ new LockManager<>(conf),
+ nodeManager);
+ replicationManager.start();
+ Thread.sleep(100L);
+ }
+
+ private void createReplicationManager(ReplicationManagerConfiguration rmConf)
+ throws InterruptedException {
+ replicationManager = new ReplicationManager(
+ rmConf,
+ containerManager,
+ containerPlacementPolicy,
+ eventQueue,
+ new LockManager<>(conf),
+ nodeManager);
replicationManager.start();
Thread.sleep(100L);
}
@@ -606,6 +640,213 @@ public class TestReplicationManager {
}
+ /**
+ * ReplicationManager should replicate an additional replica if there are
+ * decommissioned replicas.
+ */
+ @Test
+ public void testUnderReplicatedDueToDecommission() throws
+ SCMException, ContainerNotFoundException, InterruptedException {
+ final ContainerInfo container = setupReplicas(LifeCycleState.CLOSED,
+ State.CLOSED, State.DECOMMISSIONED, State.DECOMMISSIONED);
+ assertReplicaScheduled(2);
+ }
+
+ /**
+ * ReplicationManager should replicate an additional replica when all copies
+ * are decommissioning.
+ */
+ @Test
+ public void testUnderReplicatedDueToAllDecommission() throws
+ SCMException, ContainerNotFoundException, InterruptedException {
+ final ContainerInfo container = setupReplicas(LifeCycleState.CLOSED,
+ State.DECOMMISSIONED, State.DECOMMISSIONED, State.DECOMMISSIONED);
+ assertReplicaScheduled(3);
+ }
+
+ /**
+ * ReplicationManager should not take any action when the container is
+ * correctly replicated with decommissioned replicas still present.
+ */
+ @Test
+ public void testCorrectlyReplicatedWithDecommission() throws
+ SCMException, ContainerNotFoundException, InterruptedException {
+ final ContainerInfo container = setupReplicas(LifeCycleState.CLOSED,
+ State.CLOSED, State.CLOSED, State.CLOSED, State.DECOMMISSIONED);
+ assertReplicaScheduled(0);
+ }
+
+ /**
+ * ReplicationManager should replicate an additional replica when min rep
+ * is not met for maintenance.
+ */
+ @Test
+ public void testUnderReplicatedDueToMaintenance() throws
+ SCMException, ContainerNotFoundException, InterruptedException {
+ final ContainerInfo container = setupReplicas(LifeCycleState.CLOSED,
+ State.CLOSED, State.MAINTENANCE, State.MAINTENANCE);
+ assertReplicaScheduled(1);
+ }
+
+ /**
+ * ReplicationManager should not replicate an additional replica when if
+ * min replica for maintenance is 1 and another replica is available.
+ */
+ @Test
+ public void testNotUnderReplicatedDueToMaintenanceMinRepOne() throws
+ SCMException, ContainerNotFoundException, InterruptedException {
+ replicationManager.stop();
+ ReplicationManagerConfiguration newConf =
+ new ReplicationManagerConfiguration();
+ newConf.setMaintenanceReplicaMinimum(1);
+ createReplicationManager(newConf);
+ final ContainerInfo container = setupReplicas(LifeCycleState.CLOSED,
+ State.CLOSED, State.MAINTENANCE, State.MAINTENANCE);
+ assertReplicaScheduled(0);
+ }
+
+ /**
+ * ReplicationManager should replicate an additional replica when all copies
+ * are going off line and min rep is 1.
+ */
+ @Test
+ public void testUnderReplicatedDueToMaintenanceMinRepOne() throws
+ SCMException, ContainerNotFoundException, InterruptedException {
+ replicationManager.stop();
+ ReplicationManagerConfiguration newConf =
+ new ReplicationManagerConfiguration();
+ newConf.setMaintenanceReplicaMinimum(1);
+ createReplicationManager(newConf);
+ final ContainerInfo container = setupReplicas(LifeCycleState.CLOSED,
+ State.MAINTENANCE, State.MAINTENANCE, State.MAINTENANCE);
+ assertReplicaScheduled(1);
+ }
+
+ /**
+ * ReplicationManager should replicate additional replica when all copies
+ * are going into maintenance.
+ */
+ @Test
+ public void testUnderReplicatedDueToAllMaintenance() throws
+ SCMException, ContainerNotFoundException, InterruptedException {
+ final ContainerInfo container = setupReplicas(LifeCycleState.CLOSED,
+ State.MAINTENANCE, State.MAINTENANCE, State.MAINTENANCE);
+ assertReplicaScheduled(2);
+ }
+
+ /**
+ * ReplicationManager should not replicate additional replica sufficient
+ * replica are available.
+ */
+ @Test
+ public void testCorrectlyReplicatedWithMaintenance() throws
+ SCMException, ContainerNotFoundException, InterruptedException {
+ final ContainerInfo container = setupReplicas(LifeCycleState.CLOSED,
+ State.CLOSED, State.CLOSED, State.MAINTENANCE, State.MAINTENANCE);
+ assertReplicaScheduled(0);
+ }
+
+ /**
+ * ReplicationManager should replicate additional replica when all copies
+ * are decommissioning or maintenance.
+ */
+ @Test
+ public void testUnderReplicatedWithDecommissionAndMaintenance() throws
+ SCMException, ContainerNotFoundException, InterruptedException {
+ final ContainerInfo container = setupReplicas(LifeCycleState.CLOSED,
+ State.DECOMMISSIONED, State.DECOMMISSIONED, State.MAINTENANCE,
+ State.MAINTENANCE);
+ assertReplicaScheduled(2);
+ }
+
+ /**
+ * When a CLOSED container is over replicated, ReplicationManager
+ * deletes the excess replicas. While choosing the replica for deletion
+ * ReplicationManager should not attempt to remove a DECOMMISSION or
+ * MAINTENANCE replica.
+ */
+ @Test
+ public void testOverReplicatedClosedContainerWithDecomAndMaint()
+ throws SCMException, ContainerNotFoundException, InterruptedException {
+ final ContainerInfo container = setupReplicas(LifeCycleState.CLOSED,
+ State.DECOMMISSIONED, State.MAINTENANCE,
+ State.CLOSED, State.CLOSED, State.CLOSED, State.CLOSED);
+
+ final int currentDeleteCommandCount = datanodeCommandHandler
+ .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
+
+ replicationManager.processContainersNow();
+ // Wait for EventQueue to call the event handler
+ Thread.sleep(100L);
+ Assert.assertEquals(currentDeleteCommandCount + 1, datanodeCommandHandler
+ .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
+ // Get the DECOM and Maint replica and ensure none of them are scheduled
+ // for removal
+ Set<ContainerReplica> decom =
+ containerStateManager.getContainerReplicas(container.containerID())
+ .stream()
+ .filter(r -> r.getState() != State.CLOSED)
+ .collect(Collectors.toSet());
+ for (ContainerReplica r : decom) {
+ Assert.assertFalse(datanodeCommandHandler.received(
+ SCMCommandProto.Type.deleteContainerCommand,
+ r.getDatanodeDetails()));
+ }
+ }
+
+ /**
+ * Replication Manager should not attempt to replicate from an unhealthy
+ * (stale or dead) node. To test this, setup a scenario where a replia needs
+ * to be created, but mark all nodes stale. That way, no new replica will be
+ * scheduled.
+ */
+ @Test
+ public void testUnderReplicatedNotHealthySource()
+ throws SCMException, ContainerNotFoundException, InterruptedException {
+ final ContainerInfo container = setupReplicas(LifeCycleState.CLOSED,
+ NodeStatus.inServiceStale(),
+ State.CLOSED, State.DECOMMISSIONED, State.DECOMMISSIONED);
+ // There should be replica scheduled, but as all nodes are stale, nothing
+ // gets scheduled.
+ assertReplicaScheduled(0);
+ }
+
+ private ContainerInfo setupReplicas(
+ LifeCycleState containerState, State... states)
+ throws SCMException, ContainerNotFoundException {
+ return setupReplicas(containerState, NodeStatus.inServiceHealthy(), states);
+ }
+
+ private ContainerInfo setupReplicas(
+ LifeCycleState containerState, NodeStatus allNodesStatus, State... states)
+ throws SCMException, ContainerNotFoundException {
+ final ContainerInfo container = getContainer(containerState);
+ final ContainerID id = container.containerID();
+ containerStateManager.loadContainer(container);
+ final UUID originNodeId = UUID.randomUUID();
+
+ for (State s : states) {
+ DatanodeDetails dn = randomDatanodeDetails();
+ nodeManager.register(dn, allNodesStatus);
+ final ContainerReplica replica = getReplicas(
+ id, s, 1000L, originNodeId, dn);
+ containerStateManager.updateContainerReplica(id, replica);
+ }
+ return container;
+ }
+
+ private void assertReplicaScheduled(int delta) throws InterruptedException {
+ final int currentReplicateCommandCount = datanodeCommandHandler
+ .getInvocationCount(SCMCommandProto.Type.replicateContainerCommand);
+
+ replicationManager.processContainersNow();
+ // Wait for EventQueue to call the event handler
+ Thread.sleep(100L);
+ Assert.assertEquals(currentReplicateCommandCount + delta,
+ datanodeCommandHandler.getInvocationCount(
+ SCMCommandProto.Type.replicateContainerCommand));
+ }
+
@After
public void teardown() throws IOException {
containerStateManager.close();
@@ -659,4 +900,180 @@ public class TestReplicationManager {
}
}
+ private class SimpleNodeManager implements NodeManager {
+
+ private Map<UUID, DatanodeInfo> nodeMap = new HashMap();
+
+ public void register(DatanodeDetails dd, NodeStatus status) {
+ nodeMap.put(dd.getUuid(), new DatanodeInfo(dd, status));
+ }
+
+ /**
+ * If the given node was registed with the nodeManager, return the
+ * NodeStatus for the node. Otherwise return a NodeStatus of "In Service
+ * and Healthy".
+ * @param datanodeDetails DatanodeDetails
+ * @return The NodeStatus of the node if it is registered, otherwise an
+ * Inservice and Healthy NodeStatus.
+ */
+ @Override
+ public NodeStatus getNodeStatus(DatanodeDetails datanodeDetails) {
+ DatanodeInfo dni = nodeMap.get(datanodeDetails.getUuid());
+ if (dni != null) {
+ return dni.getNodeStatus();
+ } else {
+ return NodeStatus.inServiceHealthy();
+ }
+ }
+
+ /**
+ * Below here, are all auto-generate placeholder methods to implement the
+ * interface.
+ */
+ @Override
+ public List<DatanodeDetails> getNodes(NodeStatus nodeStatus) {
+ return null;
+ }
+
+ @Override
+ public List<DatanodeDetails> getNodes(
+ HddsProtos.NodeOperationalState opState, HddsProtos.NodeState health) {
+ return null;
+ }
+
+ @Override
+ public int getNodeCount(NodeStatus nodeStatus) {
+ return 0;
+ }
+
+ @Override
+ public int getNodeCount(HddsProtos.NodeOperationalState opState,
+ HddsProtos.NodeState health) {
+ return 0;
+ }
+
+ @Override
+ public List<DatanodeDetails> getAllNodes() {
+ return null;
+ }
+
+ @Override
+ public SCMNodeStat getStats() {
+ return null;
+ }
+
+ @Override
+ public Map<DatanodeDetails, SCMNodeStat> getNodeStats() {
+ return null;
+ }
+
+ @Override
+ public SCMNodeMetric getNodeStat(DatanodeDetails datanodeDetails) {
+ return null;
+ }
+
+ @Override
+ public void setNodeOperationalState(DatanodeDetails datanodeDetails,
+ HddsProtos.NodeOperationalState newState) throws NodeNotFoundException {
+ }
+
+ @Override
+ public Set<PipelineID> getPipelines(DatanodeDetails datanodeDetails) {
+ return null;
+ }
+
+ @Override
+ public void addPipeline(Pipeline pipeline) {
+ }
+
+ @Override
+ public void removePipeline(Pipeline pipeline) {
+ }
+
+ @Override
+ public void addContainer(DatanodeDetails datanodeDetails,
+ ContainerID containerId) throws NodeNotFoundException {
+ }
+
+ @Override
+ public void setContainers(DatanodeDetails datanodeDetails,
+ Set<ContainerID> containerIds) throws NodeNotFoundException {
+ }
+
+ @Override
+ public Set<ContainerID> getContainers(DatanodeDetails datanodeDetails)
+ throws NodeNotFoundException {
+ return null;
+ }
+
+ @Override
+ public void addDatanodeCommand(UUID dnId, SCMCommand command) {
+ }
+
+ @Override
+ public void processNodeReport(DatanodeDetails datanodeDetails,
+ StorageContainerDatanodeProtocolProtos.NodeReportProto nodeReport) {
+ }
+
+ @Override
+ public List<SCMCommand> getCommandQueue(UUID dnID) {
+ return null;
+ }
+
+ @Override
+ public DatanodeDetails getNodeByUuid(String uuid) {
+ return null;
+ }
+
+ @Override
+ public List<DatanodeDetails> getNodesByAddress(String address) {
+ return null;
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+
+ @Override
+ public Map<String, Integer> getNodeCount() {
+ return null;
+ }
+
+ @Override
+ public Map<String, Long> getNodeInfo() {
+ return null;
+ }
+
+ @Override
+ public void onMessage(CommandForDatanode commandForDatanode,
+ EventPublisher publisher) {
+ }
+
+ @Override
+ public VersionResponse getVersion(
+ StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto
+ versionRequest) {
+ return null;
+ }
+
+ @Override
+ public RegisteredCommand register(DatanodeDetails datanodeDetails,
+ StorageContainerDatanodeProtocolProtos.NodeReportProto nodeReport,
+ StorageContainerDatanodeProtocolProtos.PipelineReportsProto
+ pipelineReport) {
+ return null;
+ }
+
+ @Override
+ public List<SCMCommand> processHeartbeat(DatanodeDetails datanodeDetails) {
+ return null;
+ }
+
+ @Override
+ public Boolean isNodeRegistered(DatanodeDetails datanodeDetails) {
+ return null;
+ }
+ }
+
}
\ No newline at end of file
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackAware.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackAware.java
index 003035c..abf7f9f 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackAware.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackAware.java
@@ -47,17 +47,11 @@ import static org.apache.hadoop.hdds.scm.net.NetConstants.RACK_SCHEMA;
import static org.apache.hadoop.hdds.scm.net.NetConstants.ROOT_SCHEMA;
import org.hamcrest.MatcherAssert;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
-import org.junit.Assert;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.Assume.assumeTrue;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
import static org.mockito.Matchers.anyObject;
-import org.mockito.Mockito;
import static org.mockito.Mockito.when;
/**
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/states/TestContainerReplicaCount.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/states/TestContainerReplicaCount.java
new file mode 100644
index 0000000..9a50232
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/states/TestContainerReplicaCount.java
@@ -0,0 +1,333 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.container.states;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
+import org.apache.hadoop.hdds.scm.TestUtils;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.container.ContainerReplicaCount;
+import org.junit.Before;
+import org.junit.Test;
+import java.util.*;
+import static junit.framework.TestCase.assertEquals;
+import static org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.CLOSED;
+import static org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State
+ .DECOMMISSIONED;
+import static org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State
+ .MAINTENANCE;
+
+/**
+ * Class used to test the ContainerReplicaCount class.
+ */
+public class TestContainerReplicaCount {
+
+ @Before
+ public void setup() {
+ }
+
+ @Test
+ public void testThreeHealthyReplica() {
+ Set<ContainerReplica> replica = registerNodes(CLOSED, CLOSED, CLOSED);
+ ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 0, 0, 3, 2);
+ validate(rcnt, true, 0, false);
+ }
+
+ @Test
+ public void testTwoHealthyReplica() {
+ Set<ContainerReplica> replica = registerNodes(CLOSED, CLOSED);
+ ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 0, 0, 3, 2);
+ validate(rcnt, false, 1, false);
+ }
+
+ @Test
+ public void testOneHealthyReplica() {
+ Set<ContainerReplica> replica = registerNodes(CLOSED);
+ ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 0, 0, 3, 2);
+ validate(rcnt, false, 2, false);
+ }
+
+ @Test
+ public void testTwoHealthyAndInflightAdd() {
+ Set<ContainerReplica> replica = registerNodes(CLOSED, CLOSED);
+ ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 1, 0, 3, 2);
+ validate(rcnt, false, 0, false);
+ }
+
+ @Test
+ /**
+ * This does not schedule a container to be removed, as the inFlight add may
+ * fail and then the delete would make things under-replicated. Once the add
+ * completes there will be 4 healthy and it will get taken care of then.
+ */
+ public void testThreeHealthyAndInflightAdd() {
+ Set<ContainerReplica> replica = registerNodes(CLOSED, CLOSED, CLOSED);
+ ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 1, 0, 3, 2);
+ validate(rcnt, true, 0, false);
+ }
+
+ @Test
+ /**
+ * As the inflight delete may fail, but as it will make the the container
+ * under replicated, we go ahead and schedule another replica to be added.
+ */
+ public void testThreeHealthyAndInflightDelete() {
+ Set<ContainerReplica> replica = registerNodes(CLOSED, CLOSED, CLOSED);
+ ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 0, 1, 3, 2);
+ validate(rcnt, false, 1, false);
+ }
+
+ @Test
+ /**
+ * This is NOT sufficiently replicated as the inflight add may fail and the
+ * inflight del could succeed, leaving only 2 healthy replicas.
+ */
+ public void testThreeHealthyAndInflightAddAndInFlightDelete() {
+ Set<ContainerReplica> replica = registerNodes(CLOSED, CLOSED, CLOSED);
+ ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 1, 1, 3, 2);
+ validate(rcnt, false, 0, false);
+ }
+
+ @Test
+ public void testFourHealthyReplicas() {
+ Set<ContainerReplica> replica =
+ registerNodes(CLOSED, CLOSED, CLOSED, CLOSED);
+ ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 0, 0, 3, 2);
+ validate(rcnt, true, -1, true);
+ }
+
+ @Test
+ public void testFourHealthyReplicasAndInFlightDelete() {
+ Set<ContainerReplica> replica =
+ registerNodes(CLOSED, CLOSED, CLOSED, CLOSED);
+ ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 0, 1, 3, 2);
+ validate(rcnt, true, 0, false);
+ }
+
+ @Test
+ public void testFourHealthyReplicasAndTwoInFlightDelete() {
+ Set<ContainerReplica> replica =
+ registerNodes(CLOSED, CLOSED, CLOSED, CLOSED);
+ ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 0, 2, 3, 2);
+ validate(rcnt, false, 1, false);
+ }
+
+ @Test
+ public void testOneHealthyReplicaRepFactorOne() {
+ Set<ContainerReplica> replica = registerNodes(CLOSED);
+ ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 0, 0, 1, 2);
+ validate(rcnt, true, 0, false);
+ }
+
+ @Test
+ public void testOneHealthyReplicaRepFactorOneInFlightDelete() {
+ Set<ContainerReplica> replica = registerNodes(CLOSED);
+ ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 0, 1, 1, 2);
+ validate(rcnt, false, 1, false);
+ }
+
+ @Test
+ public void testTwoHealthyReplicaTwoInflightAdd() {
+ Set<ContainerReplica> replica = registerNodes(CLOSED, CLOSED);
+ ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 2, 0, 3, 2);
+ validate(rcnt, false, 0, false);
+ }
+
+ /**
+ * From here consider decommission replicas.
+ */
+
+ @Test
+ public void testThreeHealthyAndTwoDecommission() {
+ Set<ContainerReplica> replica = registerNodes(CLOSED, CLOSED, CLOSED,
+ DECOMMISSIONED, DECOMMISSIONED);
+ ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 0, 0, 3, 2);
+ validate(rcnt, true, 0, false);
+ }
+
+ @Test
+ public void testOneDecommissionedReplica() {
+ Set<ContainerReplica> replica =
+ registerNodes(CLOSED, CLOSED, DECOMMISSIONED);
+ ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 0, 0, 3, 2);
+ validate(rcnt, false, 1, false);
+ }
+
+ @Test
+ public void testTwoHealthyOneDecommissionedneInFlightAdd() {
+ Set<ContainerReplica> replica =
+ registerNodes(CLOSED, CLOSED, DECOMMISSIONED);
+ ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 1, 0, 3, 2);
+ validate(rcnt, false, 0, false);
+ }
+
+ @Test
+ public void testAllDecommissioned() {
+ Set<ContainerReplica> replica =
+ registerNodes(DECOMMISSIONED, DECOMMISSIONED, DECOMMISSIONED);
+ ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 0, 0, 3, 2);
+ validate(rcnt, false, 3, false);
+ }
+
+ @Test
+ public void testAllDecommissionedRepFactorOne() {
+ Set<ContainerReplica> replica = registerNodes(DECOMMISSIONED);
+ ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 0, 0, 1, 2);
+ validate(rcnt, false, 1, false);
+ }
+
+ @Test
+ public void testAllDecommissionedRepFactorOneInFlightAdd() {
+ Set<ContainerReplica> replica = registerNodes(DECOMMISSIONED);
+ ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 1, 0, 1, 2);
+ validate(rcnt, false, 0, false);
+ }
+
+ @Test
+ public void testOneHealthyOneDecommissioningRepFactorOne() {
+ Set<ContainerReplica> replica = registerNodes(DECOMMISSIONED, CLOSED);
+ ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 0, 0, 1, 2);
+ validate(rcnt, true, 0, false);
+ }
+
+ /**
+ * Maintenance tests from here.
+ */
+
+ @Test
+ public void testOneHealthyTwoMaintenanceMinRepOfTwo() {
+ Set<ContainerReplica> replica =
+ registerNodes(CLOSED, MAINTENANCE, MAINTENANCE);
+ ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 0, 0, 3, 2);
+ validate(rcnt, false, 1, false);
+ }
+
+ @Test
+ public void testOneHealthyThreeMaintenanceMinRepOfTwo() {
+ Set<ContainerReplica> replica = registerNodes(CLOSED,
+ MAINTENANCE, MAINTENANCE, MAINTENANCE);
+ ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 0, 0, 3, 2);
+ validate(rcnt, false, 1, false);
+ }
+
+ @Test
+ public void testOneHealthyTwoMaintenanceMinRepOfOne() {
+ Set<ContainerReplica> replica =
+ registerNodes(CLOSED, MAINTENANCE, MAINTENANCE);
+ ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 0, 0, 3, 1);
+ validate(rcnt, true, 0, false);
+ }
+
+ @Test
+ public void testOneHealthyThreeMaintenanceMinRepOfTwoInFlightAdd() {
+ Set<ContainerReplica> replica = registerNodes(CLOSED,
+ MAINTENANCE, MAINTENANCE, MAINTENANCE);
+ ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 1, 0, 3, 2);
+ validate(rcnt, false, 0, false);
+ }
+
+ @Test
+ public void testAllMaintenance() {
+ Set<ContainerReplica> replica =
+ registerNodes(MAINTENANCE, MAINTENANCE, MAINTENANCE);
+ ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 0, 0, 3, 2);
+ validate(rcnt, false, 2, false);
+ }
+
+ @Test
+ /**
+ * As we have exactly 3 healthy, but then an excess of maintenance copies
+ * we ignore the over-replication caused by the maintenance copies until they
+ * come back online, and then deal with them.
+ */
+ public void testThreeHealthyTwoInMaintenance() {
+ Set<ContainerReplica> replica = registerNodes(CLOSED, CLOSED, CLOSED,
+ MAINTENANCE, MAINTENANCE);
+ ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 0, 0, 3, 2);
+ validate(rcnt, true, 0, false);
+ }
+
+ @Test
+ /**
+ * This is somewhat similar to testThreeHealthyTwoInMaintenance() except now
+ * one of the maintenance copies has become healthy and we will need to remove
+ * the over-replicated healthy container.
+ */
+ public void testFourHealthyOneInMaintenance() {
+ Set<ContainerReplica> replica =
+ registerNodes(CLOSED, CLOSED, CLOSED, CLOSED, MAINTENANCE);
+ ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 0, 0, 3, 2);
+ validate(rcnt, true, -1, true);
+ }
+
+ @Test
+ public void testOneMaintenanceMinRepOfTwoRepFactorOne() {
+ Set<ContainerReplica> replica = registerNodes(MAINTENANCE);
+ ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 0, 0, 1, 2);
+ validate(rcnt, false, 1, false);
+ }
+
+ @Test
+ public void testOneMaintenanceMinRepOfTwoRepFactorOneInFlightAdd() {
+ Set<ContainerReplica> replica = registerNodes(MAINTENANCE);
+ ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 1, 0, 1, 2);
+ validate(rcnt, false, 0, false);
+ }
+
+ @Test
+ public void testOneHealthyOneMaintenanceRepFactorOne() {
+ Set<ContainerReplica> replica = registerNodes(MAINTENANCE, CLOSED);
+ ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 0, 0, 1, 2);
+ validate(rcnt, true, 0, false);
+ }
+
+ @Test
+ public void testTwoDecomTwoMaintenanceOneInflightAdd() {
+ Set<ContainerReplica> replica =
+ registerNodes(DECOMMISSIONED, DECOMMISSIONED, MAINTENANCE, MAINTENANCE);
+ ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 1, 0, 3, 2);
+ validate(rcnt, false, 1, false);
+ }
+
+ private void validate(ContainerReplicaCount rcnt,
+ boolean sufficientlyReplicated, int replicaDelta, boolean overRelicated) {
+ assertEquals(sufficientlyReplicated, rcnt.isSufficientlyReplicated());
+ assertEquals(replicaDelta, rcnt.additionalReplicaNeeded());
+ }
+
+ private Set<ContainerReplica> registerNodes(
+ ContainerReplicaProto.State... states) {
+ Set<ContainerReplica> replica = new HashSet<>();
+ for (ContainerReplicaProto.State s : states) {
+ DatanodeDetails dn = TestUtils.randomDatanodeDetails();
+ replica.add(new ContainerReplica.ContainerReplicaBuilder()
+ .setContainerID(new ContainerID(1))
+ .setContainerState(s)
+ .setDatanodeDetails(dn)
+ .setOriginNodeId(dn.getUuid())
+ .setSequenceId(1)
+ .build());
+ }
+ return replica;
+ }
+}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDatanodeAdminMonitor.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDatanodeAdminMonitor.java
index 0aa0221..acd8993 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDatanodeAdminMonitor.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDatanodeAdminMonitor.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.After;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Test;
import java.io.IOException;
@@ -127,7 +128,9 @@ public class TestDatanodeAdminMonitor {
}
+
@Test
+ @Ignore // HDDS-2631
public void testMonitoredNodeHasPipelinesClosed()
throws NodeNotFoundException, TimeoutException, InterruptedException {
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSafeModeHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSafeModeHandler.java
index 5572e9a..a2587a7 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSafeModeHandler.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSafeModeHandler.java
@@ -23,11 +23,13 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.block.BlockManager;
import org.apache.hadoop.hdds.scm.block.BlockManagerImpl;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.container.MockNodeManager;
import org.apache.hadoop.hdds.scm.container.ReplicationManager;
import org.apache.hadoop.hdds.scm.container.ReplicationManager.ReplicationManagerConfiguration;
import org.apache.hadoop.hdds.scm.container.placement.algorithms
.ContainerPlacementPolicy;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
import org.apache.hadoop.hdds.scm.server.SCMClientProtocolServer;
@@ -54,6 +56,7 @@ public class TestSafeModeHandler {
private EventQueue eventQueue;
private SCMSafeModeManager.SafeModeStatus safeModeStatus;
private PipelineManager scmPipelineManager;
+ private NodeManager nodeManager;
public void setup(boolean enabled) {
configuration = new OzoneConfiguration();
@@ -68,10 +71,11 @@ public class TestSafeModeHandler {
Mockito.mock(ContainerManager.class);
Mockito.when(containerManager.getContainerIDs())
.thenReturn(new HashSet<>());
+ nodeManager = new MockNodeManager(false, 0);
replicationManager = new ReplicationManager(
new ReplicationManagerConfiguration(),
containerManager, Mockito.mock(ContainerPlacementPolicy.class),
- eventQueue, new LockManager(configuration));
+ eventQueue, new LockManager(configuration), nodeManager);
scmPipelineManager = Mockito.mock(SCMPipelineManager.class);
blockManager = Mockito.mock(BlockManagerImpl.class);
safeModeHandler =
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDecommissionAndMaintenance.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDecommissionAndMaintenance.java
index 5cf0864..159683c 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDecommissionAndMaintenance.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDecommissionAndMaintenance.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.ozone.scm.node;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.client.ContainerOperationClient;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.ozone.MiniOzoneCluster;
---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: ozone-commits-help@hadoop.apache.org