You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ar...@apache.org on 2019/03/22 21:36:36 UTC
[hadoop] branch trunk updated: HDDS-1205. Refactor
ReplicationManager to handle QUASI_CLOSED containers. Contributed by Nanda
kumar. (#620)
This is an automated email from the ASF dual-hosted git repository.
arp pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new f854a89 HDDS-1205. Refactor ReplicationManager to handle QUASI_CLOSED containers. Contributed by Nanda kumar. (#620)
f854a89 is described below
commit f854a89190bd2453ccb1bfaa123d63d546e913cd
Author: Arpit Agarwal <ar...@users.noreply.github.com>
AuthorDate: Fri Mar 22 14:36:29 2019 -0700
HDDS-1205. Refactor ReplicationManager to handle QUASI_CLOSED containers. Contributed by Nanda kumar. (#620)
---
.../org/apache/hadoop/hdds/scm/ScmConfigKeys.java | 12 +
.../common/src/main/resources/ozone-default.xml | 20 +
.../hdds/scm/container/ContainerManager.java | 8 +
.../hdds/scm/container/ReplicationManager.java | 748 +++++++++++++++++++++
.../hdds/scm/container/SCMContainerManager.java | 10 +
.../scm/container/states/ContainerStateMap.java | 9 +-
.../java/org/apache/hadoop/hdds/scm/TestUtils.java | 53 +-
.../scm/container/TestContainerReportHandler.java | 8 +-
.../scm/container/TestContainerReportHelper.java | 40 --
.../TestIncrementalContainerReportHandler.java | 6 +-
.../hdds/scm/container/TestReplicationManager.java | 625 +++++++++++++++++
11 files changed, 1485 insertions(+), 54 deletions(-)
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
index 4e197d3..3b45b89 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
@@ -348,6 +348,18 @@ public final class ScmConfigKeys {
public static final String HDDS_SCM_WATCHER_TIMEOUT_DEFAULT =
"10m";
+ public static final String HDDS_SCM_REPLICATION_THREAD_INTERVAL =
+ "hdds.scm.replication.thread.interval";
+
+ public static final String HDDS_SCM_REPLICATION_THREAD_INTERVAL_DEFAULT =
+ "5m";
+
+ public static final String HDDS_SCM_REPLICATION_EVENT_TIMEOUT =
+ "hdds.scm.replication.event.timeout";
+
+ public static final String HDDS_SCM_REPLICATION_EVENT_TIMEOUT_DEFAULT =
+ "10m";
+
public static final String
HDDS_SCM_HTTP_KERBEROS_PRINCIPAL_KEY =
"hdds.scm.http.kerberos.principal";
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 462a07b..9fd4ef3 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -2357,4 +2357,24 @@
Request to flush the OM DB before taking checkpoint snapshot.
</description>
</property>
+ <property>
+ <name>hdds.scm.replication.thread.interval</name>
+ <value>5m</value>
+ <tag>OZONE, SCM</tag>
+ <description>
+ There is a replication monitor thread running inside SCM which
+ takes care of replicating the containers in the cluster. This
+ property is used to configure the interval in which that thread
+ runs.
+ </description>
+ </property>
+ <property>
+ <name>hdds.scm.replication.event.timeout</name>
+ <value>10m</value>
+ <tag>OZONE, SCM</tag>
+ <description>
+ Timeout for the container replication/deletion commands sent
+ to datanodes. After this timeout the command will be retried.
+ </description>
+ </property>
</configuration>
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java
index b2fe4b4..717d58d 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java
@@ -34,6 +34,14 @@ import java.util.Set;
*/
public interface ContainerManager extends Closeable {
+
+ /**
+ * Returns all the container Ids managed by ContainerManager.
+ *
+ * @return Set of ContainerID
+ */
+ Set<ContainerID> getContainerIDs();
+
/**
* Returns all the containers managed by ContainerManager.
*
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
new file mode 100644
index 0000000..97c600b
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
@@ -0,0 +1,748 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.GeneratedMessage;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.placement.algorithms
+ .ContainerPlacementPolicy;
+import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.ozone.lock.LockManager;
+import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
+import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
+import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
+import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.apache.hadoop.util.ExitUtil;
+import org.apache.hadoop.util.Time;
+import org.apache.ratis.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+/**
+ * Replication Manager (RM) is the one which is responsible for making sure
+ * that the containers are properly replicated. Replication Manager deals only
+ * with Quasi Closed / Closed container.
+ */
+public class ReplicationManager {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ReplicationManager.class);
+
+ /**
+ * Reference to the ContainerManager.
+ */
+ private final ContainerManager containerManager;
+
+ /**
+ * PlacementPolicy which is used to identify where a container
+ * should be replicated.
+ */
+ private final ContainerPlacementPolicy containerPlacement;
+
+ /**
+ * EventPublisher to fire Replicate and Delete container events.
+ */
+ private final EventPublisher eventPublisher;
+
+ /**
+ * Used for locking a container using its ID while processing it.
+ */
+ private final LockManager<ContainerID> lockManager;
+
+ /**
+ * This is used for tracking container replication commands which are issued
+ * by ReplicationManager and not yet complete.
+ */
+ private final Map<ContainerID, List<InflightAction>> inflightReplication;
+
+ /**
+ * This is used for tracking container deletion commands which are issued
+ * by ReplicationManager and not yet complete.
+ */
+ private final Map<ContainerID, List<InflightAction>> inflightDeletion;
+
+ /**
+ * ReplicationMonitor thread is the one which wakes up at configured
+ * interval and processes all the containers.
+ */
+ private final Thread replicationMonitor;
+
+ /**
+ * The frequency in which ReplicationMonitor thread should run.
+ */
+ private final long interval;
+
+ /**
+ * Timeout for container replication & deletion command issued by
+ * ReplicationManager.
+ */
+ private final long eventTimeout;
+
+ /**
+ * Flag used for checking if the ReplicationMonitor thread is running or
+ * not.
+ */
+ private volatile boolean running;
+
+ /**
+ * Constructs ReplicationManager instance with the given configuration.
+ *
+ * @param conf OzoneConfiguration
+ * @param containerManager ContainerManager
+ * @param containerPlacement ContainerPlacementPolicy
+ * @param eventPublisher EventPublisher
+ */
+ public ReplicationManager(final Configuration conf,
+ final ContainerManager containerManager,
+ final ContainerPlacementPolicy containerPlacement,
+ final EventPublisher eventPublisher) {
+ this.containerManager = containerManager;
+ this.containerPlacement = containerPlacement;
+ this.eventPublisher = eventPublisher;
+ this.lockManager = new LockManager<>(conf);
+ this.inflightReplication = new HashMap<>();
+ this.inflightDeletion = new HashMap<>();
+ this.replicationMonitor = new Thread(this::run);
+ this.replicationMonitor.setName("ReplicationMonitor");
+ this.replicationMonitor.setDaemon(true);
+ this.interval = conf.getTimeDuration(
+ ScmConfigKeys.HDDS_SCM_REPLICATION_THREAD_INTERVAL,
+ ScmConfigKeys.HDDS_SCM_REPLICATION_THREAD_INTERVAL_DEFAULT,
+ TimeUnit.MILLISECONDS);
+ this.eventTimeout = conf.getTimeDuration(
+ ScmConfigKeys.HDDS_SCM_REPLICATION_EVENT_TIMEOUT,
+ ScmConfigKeys.HDDS_SCM_REPLICATION_EVENT_TIMEOUT_DEFAULT,
+ TimeUnit.MILLISECONDS);
+ this.running = false;
+ }
+
+ /**
+ * Starts Replication Monitor thread.
+ */
+ public synchronized void start() {
+ if (!running) {
+ LOG.info("Starting Replication Monitor Thread.");
+ running = true;
+ replicationMonitor.start();
+ } else {
+ LOG.info("Replication Monitor Thread is already running.");
+ }
+ }
+
+ /**
+ * Process all the containers immediately.
+ */
+ @VisibleForTesting
+ @SuppressFBWarnings(value="NN_NAKED_NOTIFY",
+ justification="Used only for testing")
+ synchronized void processContainersNow() {
+ notify();
+ }
+
+ /**
+ * Stops Replication Monitor thread.
+ */
+ public synchronized void stop() {
+ if (running) {
+ LOG.info("Stopping Replication Monitor Thread.");
+ running = false;
+ notify();
+ } else {
+ LOG.info("Replication Monitor Thread is not running.");
+ }
+ }
+
+ /**
+ * ReplicationMonitor thread runnable. This wakes up at configured
+ * interval and processes all the containers in the system.
+ */
+ private synchronized void run() {
+ try {
+ while (running) {
+ final long start = Time.monotonicNow();
+ final Set<ContainerID> containerIds =
+ containerManager.getContainerIDs();
+ containerIds.forEach(this::processContainer);
+
+ LOG.info("Replication Monitor Thread took {} milliseconds for" +
+ " processing {} containers.", Time.monotonicNow() - start,
+ containerIds.size());
+
+ wait(interval);
+ }
+ } catch (Throwable t) {
+ // When we get runtime exception, we should terminate SCM.
+ LOG.error("Exception in Replication Monitor Thread.", t);
+ ExitUtil.terminate(1, t);
+ }
+ }
+
+ /**
+ * Process the given container.
+ *
+ * @param id ContainerID
+ */
+ private void processContainer(ContainerID id) {
+ lockManager.lock(id);
+ try {
+ final ContainerInfo container = containerManager.getContainer(id);
+ final Set<ContainerReplica> replicas = containerManager
+ .getContainerReplicas(container.containerID());
+ final LifeCycleState state = container.getState();
+
+ /*
+ * We don't take any action if the container is in OPEN state.
+ */
+ if (state == LifeCycleState.OPEN) {
+ return;
+ }
+
+ /*
+ * If the container is in CLOSING state, the replicas can either
+ * be in OPEN or in CLOSING state. In both of this cases
+ * we have to resend close container command to the datanodes.
+ */
+ if (state == LifeCycleState.CLOSING) {
+ replicas.forEach(replica -> sendCloseCommand(
+ container, replica.getDatanodeDetails(), false));
+ return;
+ }
+
+ /*
+ * If the container is in QUASI_CLOSED state, check and close the
+ * container if possible.
+ */
+ if (state == LifeCycleState.QUASI_CLOSED &&
+ canForceCloseContainer(container, replicas)) {
+ forceCloseContainer(container, replicas);
+ return;
+ }
+
+ /*
+ * Before processing the container we have to reconcile the
+ * inflightReplication and inflightDeletion actions.
+ *
+ * We remove the entry from inflightReplication and inflightDeletion
+ * list, if the operation is completed or if it has timed out.
+ */
+ updateInflightAction(container, inflightReplication,
+ action -> replicas.stream()
+ .anyMatch(r -> r.getDatanodeDetails().equals(action.datanode)));
+
+ updateInflightAction(container, inflightDeletion,
+ action -> replicas.stream()
+ .noneMatch(r -> r.getDatanodeDetails().equals(action.datanode)));
+
+
+ /*
+ * We don't have to take any action if the container is healthy.
+ *
+ * According to ReplicationMonitor container is considered healthy if
+ * the container is either in QUASI_CLOSED or in CLOSED state and has
+ * exact number of replicas in the same state.
+ */
+ if (isContainerHealthy(container, replicas)) {
+ return;
+ }
+
+ /*
+ * Check if the container is under replicated and take appropriate
+ * action.
+ */
+ if (isContainerUnderReplicated(container, replicas)) {
+ handleUnderReplicatedContainer(container, replicas);
+ return;
+ }
+
+ /*
+ * Check if the container is over replicated and take appropriate
+ * action.
+ */
+ if (isContainerOverReplicated(container, replicas)) {
+ handleOverReplicatedContainer(container, replicas);
+ return;
+ }
+
+ /*
+ * The container is neither under nor over replicated and the container
+ * is not healthy. This means that the container has unhealthy/corrupted
+ * replica.
+ */
+ handleUnstableContainer(container, replicas);
+
+ } catch (ContainerNotFoundException ex) {
+ LOG.warn("Missing container {}.", id);
+ } finally {
+ lockManager.unlock(id);
+ }
+ }
+
+ /**
+ * Reconciles the InflightActions for a given container.
+ *
+ * @param container Container to update
+ * @param inflightActions inflightReplication (or) inflightDeletion
+ * @param filter filter to check if the operation is completed
+ */
+ private void updateInflightAction(final ContainerInfo container,
+ final Map<ContainerID, List<InflightAction>> inflightActions,
+ final Predicate<InflightAction> filter) {
+ final ContainerID id = container.containerID();
+ final long deadline = Time.monotonicNow() - eventTimeout;
+ if (inflightActions.containsKey(id)) {
+ final List<InflightAction> actions = inflightActions.get(id);
+ actions.removeIf(action -> action.time < deadline);
+ actions.removeIf(filter);
+ if (actions.isEmpty()) {
+ inflightActions.remove(id);
+ }
+ }
+ }
+
+ /**
+ * Returns true if the container is healthy according to ReplicationMonitor.
+ *
+ * According to ReplicationMonitor container is considered healthy if
+ * it has exact number of replicas in the same state as the container.
+ *
+ * @param container Container to check
+ * @param replicas Set of ContainerReplicas
+ * @return true if the container is healthy, false otherwise
+ */
+ private boolean isContainerHealthy(final ContainerInfo container,
+ final Set<ContainerReplica> replicas) {
+ return container.getReplicationFactor().getNumber() == replicas.size() &&
+ replicas.stream().allMatch(
+ r -> compareState(container.getState(), r.getState()));
+ }
+
+ /**
+ * Checks if the container is under replicated or not.
+ *
+ * @param container Container to check
+ * @param replicas Set of ContainerReplicas
+ * @return true if the container is under replicated, false otherwise
+ */
+ private boolean isContainerUnderReplicated(final ContainerInfo container,
+ final Set<ContainerReplica> replicas) {
+ return container.getReplicationFactor().getNumber() >
+ getReplicaCount(container.containerID(), replicas);
+ }
+
+ /**
+ * Checks if the container is over replicated or not.
+ *
+ * @param container Container to check
+ * @param replicas Set of ContainerReplicas
+ * @return true if the container if over replicated, false otherwise
+ */
+ private boolean isContainerOverReplicated(final ContainerInfo container,
+ final Set<ContainerReplica> replicas) {
+ return container.getReplicationFactor().getNumber() <
+ getReplicaCount(container.containerID(), replicas);
+ }
+
+ /**
+ * Returns the replication count of the given container. This also
+ * considers inflight replication and deletion.
+ *
+ * @param id ContainerID
+ * @param replicas Set of existing replicas
+ * @return number of estimated replicas for this container
+ */
+ private int getReplicaCount(final ContainerID id,
+ final Set<ContainerReplica> replicas) {
+ return replicas.size()
+ + inflightReplication.getOrDefault(id, Collections.emptyList()).size()
+ - inflightDeletion.getOrDefault(id, Collections.emptyList()).size();
+ }
+
+ /**
+ * Returns true if more than 50% of the container replicas with unique
+ * originNodeId are in QUASI_CLOSED state.
+ *
+ * @param container Container to check
+ * @param replicas Set of ContainerReplicas
+ * @return true if we can force close the container, false otherwise
+ */
+ private boolean canForceCloseContainer(final ContainerInfo container,
+ final Set<ContainerReplica> replicas) {
+ Preconditions.assertTrue(container.getState() ==
+ LifeCycleState.QUASI_CLOSED);
+ final int replicationFactor = container.getReplicationFactor().getNumber();
+ final long uniqueQuasiClosedReplicaCount = replicas.stream()
+ .filter(r -> r.getState() == State.QUASI_CLOSED)
+ .map(ContainerReplica::getOriginDatanodeId)
+ .distinct()
+ .count();
+ return uniqueQuasiClosedReplicaCount > (replicationFactor / 2);
+ }
+
+ /**
+ * Force close the container replica(s) with highest sequence Id.
+ *
+ * <p>
+ * Note: We should force close the container only if >50% (quorum)
+ * of replicas with unique originNodeId are in QUASI_CLOSED state.
+ * </p>
+ *
+ * @param container ContainerInfo
+ * @param replicas Set of ContainerReplicas
+ */
+ private void forceCloseContainer(final ContainerInfo container,
+ final Set<ContainerReplica> replicas) {
+ Preconditions.assertTrue(container.getState() ==
+ LifeCycleState.QUASI_CLOSED);
+
+ final List<ContainerReplica> quasiClosedReplicas = replicas.stream()
+ .filter(r -> r.getState() == State.QUASI_CLOSED)
+ .collect(Collectors.toList());
+
+ final Long sequenceId = quasiClosedReplicas.stream()
+ .map(ContainerReplica::getSequenceId)
+ .max(Long::compare)
+ .orElse(-1L);
+
+ LOG.info("Force closing container {} with BCSID {}," +
+ " which is in QUASI_CLOSED state.",
+ container.containerID(), sequenceId);
+
+ quasiClosedReplicas.stream()
+ .filter(r -> sequenceId != -1L)
+ .filter(replica -> replica.getSequenceId().equals(sequenceId))
+ .forEach(replica -> sendCloseCommand(
+ container, replica.getDatanodeDetails(), true));
+ }
+
+ /**
+ * If the given container is under replicated, identify a new set of
+ * datanode(s) to replicate the container using ContainerPlacementPolicy
+ * and send replicate container command to the identified datanode(s).
+ *
+ * @param container ContainerInfo
+ * @param replicas Set of ContainerReplicas
+ */
+ private void handleUnderReplicatedContainer(final ContainerInfo container,
+ final Set<ContainerReplica> replicas) {
+ try {
+ final ContainerID id = container.containerID();
+ final List<DatanodeDetails> deletionInFlight = inflightDeletion
+ .getOrDefault(id, Collections.emptyList())
+ .stream()
+ .map(action -> action.datanode)
+ .collect(Collectors.toList());
+ final List<DatanodeDetails> source = replicas.stream()
+ .filter(r ->
+ r.getState() == State.QUASI_CLOSED ||
+ r.getState() == State.CLOSED)
+ .filter(r -> !deletionInFlight.contains(r.getDatanodeDetails()))
+ .sorted((r1, r2) -> r2.getSequenceId().compareTo(r1.getSequenceId()))
+ .map(ContainerReplica::getDatanodeDetails)
+ .collect(Collectors.toList());
+ if (source.size() > 0) {
+ final int replicationFactor = container
+ .getReplicationFactor().getNumber();
+ final int delta = replicationFactor - getReplicaCount(id, replicas);
+ final List<DatanodeDetails> selectedDatanodes = containerPlacement
+ .chooseDatanodes(source, delta, container.getUsedBytes());
+
+ LOG.info("Container {} is under replicated. Expected replica count" +
+ " is {}, but found {}.", id, replicationFactor,
+ replicationFactor - delta);
+
+ for (DatanodeDetails datanode : selectedDatanodes) {
+ sendReplicateCommand(container, datanode, source);
+ }
+ } else {
+ LOG.warn("Cannot replicate container {}, no healthy replica found.",
+ container.containerID());
+ }
+ } catch (IOException ex) {
+ LOG.warn("Exception while replicating container {}.",
+ container.getContainerID(), ex);
+ }
+ }
+
+ /**
+ * If the given container is over replicated, identify the datanode(s)
+ * to delete the container and send delete container command to the
+ * identified datanode(s).
+ *
+ * @param container ContainerInfo
+ * @param replicas Set of ContainerReplicas
+ */
+ private void handleOverReplicatedContainer(final ContainerInfo container,
+ final Set<ContainerReplica> replicas) {
+
+ final ContainerID id = container.containerID();
+ final int replicationFactor = container.getReplicationFactor().getNumber();
+ // Dont consider inflight replication while calculating excess here.
+ final int excess = replicas.size() - replicationFactor -
+ inflightDeletion.getOrDefault(id, Collections.emptyList()).size();
+
+ if (excess > 0) {
+
+ LOG.info("Container {} is over replicated. Expected replica count" +
+ " is {}, but found {}.", id, replicationFactor,
+ replicationFactor + excess);
+
+ final Map<UUID, ContainerReplica> uniqueReplicas =
+ new LinkedHashMap<>();
+
+ replicas.stream()
+ .filter(r -> compareState(container.getState(), r.getState()))
+ .forEach(r -> uniqueReplicas
+ .putIfAbsent(r.getOriginDatanodeId(), r));
+
+ // Retain one healthy replica per origin node Id.
+ final List<ContainerReplica> eligibleReplicas = new ArrayList<>(replicas);
+ eligibleReplicas.removeAll(uniqueReplicas.values());
+
+ final List<ContainerReplica> unhealthyReplicas = eligibleReplicas
+ .stream()
+ .filter(r -> !compareState(container.getState(), r.getState()))
+ .collect(Collectors.toList());
+
+ //Move the unhealthy replicas to the front of eligible replicas to delete
+ eligibleReplicas.removeAll(unhealthyReplicas);
+ eligibleReplicas.addAll(0, unhealthyReplicas);
+
+ for (int i = 0; i < excess; i++) {
+ sendDeleteCommand(container,
+ eligibleReplicas.get(i).getDatanodeDetails(), true);
+ }
+ }
+ }
+
+ /**
+ * Handles unstable container.
+ * A container is inconsistent if any of the replica state doesn't
+ * match the container state. We have to take appropriate action
+ * based on state of the replica.
+ *
+ * @param container ContainerInfo
+ * @param replicas Set of ContainerReplicas
+ */
+ private void handleUnstableContainer(final ContainerInfo container,
+ final Set<ContainerReplica> replicas) {
+ // Find unhealthy replicas
+ List<ContainerReplica> unhealthyReplicas = replicas.stream()
+ .filter(r -> !compareState(container.getState(), r.getState()))
+ .collect(Collectors.toList());
+
+ Iterator<ContainerReplica> iterator = unhealthyReplicas.iterator();
+ while (iterator.hasNext()) {
+ final ContainerReplica replica = iterator.next();
+ final State state = replica.getState();
+ if (state == State.OPEN || state == State.CLOSING) {
+ sendCloseCommand(container, replica.getDatanodeDetails(), false);
+ iterator.remove();
+ }
+
+ if (state == State.QUASI_CLOSED) {
+ // Send force close command if the BCSID matches
+ if (container.getSequenceId() == replica.getSequenceId()) {
+ sendCloseCommand(container, replica.getDatanodeDetails(), true);
+ iterator.remove();
+ }
+ }
+ }
+
+ // Now we are left with the replicas which are either unhealthy or
+ // the BCSID doesn't match. These replicas should be deleted.
+
+ /*
+ * If we have unhealthy replicas we go under replicated and then
+ * replicate the healthy copy.
+ *
+ * We also make sure that we delete only one unhealthy replica at a time.
+ *
+ * If there are two unhealthy replica:
+ * - Delete first unhealthy replica
+ * - Re-replicate the healthy copy
+ * - Delete second unhealthy replica
+ * - Re-replicate the healthy copy
+ *
+ * Note: Only one action will be executed in a single ReplicationMonitor
+ * iteration. So to complete all the above actions we need four
+ * ReplicationMonitor iterations.
+ */
+
+ unhealthyReplicas.stream().findFirst().ifPresent(replica ->
+ sendDeleteCommand(container, replica.getDatanodeDetails(), false));
+
+ }
+
+ /**
+ * Sends close container command for the given container to the given
+ * datanode.
+ *
+ * @param container Container to be closed
+ * @param datanode The datanode on which the container
+ * has to be closed
+ * @param force Should be set to true if we want to close a
+ * QUASI_CLOSED container
+ */
+ private void sendCloseCommand(final ContainerInfo container,
+ final DatanodeDetails datanode,
+ final boolean force) {
+
+ LOG.info("Sending close container command for container {}" +
+ " to datanode {}.", container.containerID(), datanode);
+
+ CloseContainerCommand closeContainerCommand =
+ new CloseContainerCommand(container.getContainerID(),
+ container.getPipelineID(), force);
+ eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND,
+ new CommandForDatanode<>(datanode.getUuid(), closeContainerCommand));
+ }
+
+ /**
+ * Sends replicate container command for the given container to the given
+ * datanode.
+ *
+ * @param container Container to be replicated
+ * @param datanode The destination datanode to replicate
+ * @param sources List of source nodes from where we can replicate
+ */
+ private void sendReplicateCommand(final ContainerInfo container,
+ final DatanodeDetails datanode,
+ final List<DatanodeDetails> sources) {
+
+ LOG.info("Sending replicate container command for container {}" +
+ " to datanode {}", container.containerID(), datanode);
+
+ final ContainerID id = container.containerID();
+ final ReplicateContainerCommand replicateCommand =
+ new ReplicateContainerCommand(id.getId(), sources);
+ inflightReplication.computeIfAbsent(id, k -> new ArrayList<>());
+ sendAndTrackDatanodeCommand(datanode, replicateCommand,
+ action -> inflightReplication.get(id).add(action));
+ }
+
+ /**
+ * Sends delete container command for the given container to the given
+ * datanode.
+ *
+ * @param container Container to be deleted
+ * @param datanode The datanode on which the replica should be deleted
+ * @param force Should be set to true to delete an OPEN replica
+ */
+ private void sendDeleteCommand(final ContainerInfo container,
+ final DatanodeDetails datanode,
+ final boolean force) {
+
+ LOG.info("Sending delete container command for container {}" +
+ " to datanode {}", container.containerID(), datanode);
+
+ final ContainerID id = container.containerID();
+ final DeleteContainerCommand deleteCommand =
+ new DeleteContainerCommand(id.getId(), force);
+ inflightDeletion.computeIfAbsent(id, k -> new ArrayList<>());
+ sendAndTrackDatanodeCommand(datanode, deleteCommand,
+ action -> inflightDeletion.get(id).add(action));
+ }
+
+ /**
+ * Creates CommandForDatanode with the given SCMCommand and fires
+ * DATANODE_COMMAND event to event queue.
+ *
+ * Tracks the command using the given tracker.
+ *
+ * @param datanode Datanode to which the command has to be sent
+ * @param command SCMCommand to be sent
+ * @param tracker Tracker which tracks the inflight actions
+ * @param <T> Type of SCMCommand
+ */
+ private <T extends GeneratedMessage> void sendAndTrackDatanodeCommand(
+ final DatanodeDetails datanode,
+ final SCMCommand<T> command,
+ final Consumer<InflightAction> tracker) {
+ final CommandForDatanode<T> datanodeCommand =
+ new CommandForDatanode<>(datanode.getUuid(), command);
+ eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND, datanodeCommand);
+ tracker.accept(new InflightAction(datanode, Time.monotonicNow()));
+ }
+
+ /**
+ * Compares the container state with the replica state.
+ *
+ * @param containerState ContainerState
+ * @param replicaState ReplicaState
+ * @return true if the state matches, false otherwise
+ */
+ private static boolean compareState(final LifeCycleState containerState,
+ final State replicaState) {
+ switch (containerState) {
+ case OPEN:
+ return replicaState == State.OPEN;
+ case CLOSING:
+ return replicaState == State.CLOSING;
+ case QUASI_CLOSED:
+ return replicaState == State.QUASI_CLOSED;
+ case CLOSED:
+ return replicaState == State.CLOSED;
+ case DELETING:
+ return false;
+ case DELETED:
+ return false;
+ default:
+ return false;
+ }
+ }
+
+ /**
+ * Wrapper class to hold the InflightAction with its start time.
+ */
+ private static final class InflightAction {
+
+ private final DatanodeDetails datanode;
+ private final long time;
+
+ private InflightAction(final DatanodeDetails datanode,
+ final long time) {
+ this.datanode = datanode;
+ this.time = time;
+ }
+ }
+}
\ No newline at end of file
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java
index 2615289..6dd1949 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java
@@ -132,6 +132,16 @@ public class SCMContainerManager implements ContainerManager {
}
@Override
+ public Set<ContainerID> getContainerIDs() {
+ lock.lock();
+ try {
+ return containerStateManager.getAllContainerIDs();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
public List<ContainerInfo> getContainers() {
lock.lock();
try {
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
index 0c738b2..2aba724 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
@@ -35,7 +35,6 @@ import org.slf4j.LoggerFactory;
import java.util.HashSet;
import java.util.Set;
import java.util.Collections;
-import java.util.HashMap;
import java.util.Map;
import java.util.NavigableSet;
import java.util.TreeSet;
@@ -107,9 +106,9 @@ public class ContainerStateMap {
this.ownerMap = new ContainerAttribute<>();
this.factorMap = new ContainerAttribute<>();
this.typeMap = new ContainerAttribute<>();
- this.containerMap = new HashMap<>();
+ this.containerMap = new ConcurrentHashMap<>();
this.lock = new ReentrantReadWriteLock();
- this.replicaMap = new HashMap<>();
+ this.replicaMap = new ConcurrentHashMap<>();
this.resultCache = new ConcurrentHashMap<>();
}
@@ -208,7 +207,7 @@ public class ContainerStateMap {
try {
checkIfContainerExist(containerID);
return Collections
- .unmodifiableSet(new HashSet<>(replicaMap.get(containerID)));
+ .unmodifiableSet(replicaMap.get(containerID));
} finally {
lock.readLock().unlock();
}
@@ -342,7 +341,7 @@ public class ContainerStateMap {
}
public Set<ContainerID> getAllContainerIDs() {
- return containerMap.keySet();
+ return Collections.unmodifiableSet(containerMap.keySet());
}
/**
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
index a5284e4..19c35fd 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
@@ -17,6 +17,7 @@
package org.apache.hadoop.hdds.scm;
import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.RandomUtils;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.PipelineAction;
@@ -30,6 +31,8 @@ import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.PipelineReport;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.server.SCMConfigurator;
import org.apache.hadoop.hdds.scm.server
@@ -66,7 +69,9 @@ import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
@@ -98,7 +103,7 @@ public final class TestUtils {
*
* @return DatanodeDetails
*/
- private static DatanodeDetails createDatanodeDetails(UUID uuid) {
+ public static DatanodeDetails createDatanodeDetails(UUID uuid) {
String ipAddress = random.nextInt(256)
+ "." + random.nextInt(256)
+ "." + random.nextInt(256)
@@ -521,4 +526,50 @@ public final class TestUtils {
return new StorageContainerManager(conf, configurator);
}
+ public static ContainerInfo getContainer(
+ final HddsProtos.LifeCycleState state) {
+ return new ContainerInfo.Builder()
+ .setContainerID(RandomUtils.nextLong())
+ .setReplicationType(HddsProtos.ReplicationType.RATIS)
+ .setReplicationFactor(HddsProtos.ReplicationFactor.THREE)
+ .setState(state)
+ .setOwner("TEST")
+ .build();
+ }
+
+ public static Set<ContainerReplica> getReplicas(
+ final ContainerID containerId,
+ final ContainerReplicaProto.State state,
+ final DatanodeDetails... datanodeDetails) {
+ return getReplicas(containerId, state, 10000L, datanodeDetails);
+ }
+
+ public static Set<ContainerReplica> getReplicas(
+ final ContainerID containerId,
+ final ContainerReplicaProto.State state,
+ final long sequenceId,
+ final DatanodeDetails... datanodeDetails) {
+ Set<ContainerReplica> replicas = new HashSet<>();
+ for (DatanodeDetails datanode : datanodeDetails) {
+ replicas.add(getReplicas(containerId, state,
+ sequenceId, datanode.getUuid(), datanode));
+ }
+ return replicas;
+ }
+
+ public static ContainerReplica getReplicas(
+ final ContainerID containerId,
+ final ContainerReplicaProto.State state,
+ final long sequenceId,
+ final UUID originNodeId,
+ final DatanodeDetails datanodeDetails) {
+ return ContainerReplica.newBuilder()
+ .setContainerID(containerId)
+ .setContainerState(state)
+ .setDatanodeDetails(datanodeDetails)
+ .setOriginNodeId(originNodeId)
+ .setSequenceId(sequenceId)
+ .build();
+ }
+
}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java
index 864a1a9..0b7cae4 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java
@@ -47,10 +47,10 @@ import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import static org.apache.hadoop.hdds.scm.container
- .TestContainerReportHelper.getReplicas;
-import static org.apache.hadoop.hdds.scm.container
- .TestContainerReportHelper.getContainer;
+import static org.apache.hadoop.hdds.scm.TestUtils
+ .getReplicas;
+import static org.apache.hadoop.hdds.scm.TestUtils
+ .getContainer;
import static org.apache.hadoop.hdds.scm.container
.TestContainerReportHelper.addContainerToContainerManager;
import static org.apache.hadoop.hdds.scm.container
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHelper.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHelper.java
index 0fb50a4..860ec4d 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHelper.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHelper.java
@@ -16,19 +16,12 @@
*/
package org.apache.hadoop.hdds.scm.container;
-import org.apache.commons.lang3.RandomUtils;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
import java.io.IOException;
-import java.util.HashSet;
import java.util.Set;
/**
@@ -77,37 +70,4 @@ public final class TestContainerReportHelper {
containerInfo.containerID(), event);
}
- public static ContainerInfo getContainer(final LifeCycleState state) {
- return new ContainerInfo.Builder()
- .setContainerID(RandomUtils.nextLong())
- .setReplicationType(ReplicationType.RATIS)
- .setReplicationFactor(ReplicationFactor.THREE)
- .setState(state)
- .build();
- }
-
- static Set<ContainerReplica> getReplicas(
- final ContainerID containerId,
- final ContainerReplicaProto.State state,
- final DatanodeDetails... datanodeDetails) {
- return getReplicas(containerId, state, 10000L, datanodeDetails);
- }
-
- static Set<ContainerReplica> getReplicas(
- final ContainerID containerId,
- final ContainerReplicaProto.State state,
- final long sequenceId,
- final DatanodeDetails... datanodeDetails) {
- Set<ContainerReplica> replicas = new HashSet<>();
- for (DatanodeDetails datanode : datanodeDetails) {
- replicas.add(ContainerReplica.newBuilder()
- .setContainerID(containerId)
- .setContainerState(state)
- .setDatanodeDetails(datanode)
- .setOriginNodeId(datanode.getUuid())
- .setSequenceId(sequenceId)
- .build());
- }
- return replicas;
- }
}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestIncrementalContainerReportHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestIncrementalContainerReportHandler.java
index 23e96dd..6c9383f 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestIncrementalContainerReportHandler.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestIncrementalContainerReportHandler.java
@@ -38,10 +38,8 @@ import java.util.Set;
import static org.apache.hadoop.hdds.scm.container
.TestContainerReportHelper.addContainerToContainerManager;
-import static org.apache.hadoop.hdds.scm.container
- .TestContainerReportHelper.getContainer;
-import static org.apache.hadoop.hdds.scm.container
- .TestContainerReportHelper.getReplicas;
+import static org.apache.hadoop.hdds.scm.TestUtils.getContainer;
+import static org.apache.hadoop.hdds.scm.TestUtils.getReplicas;
import static org.apache.hadoop.hdds.scm.container
.TestContainerReportHelper.mockUpdateContainerReplica;
import static org.apache.hadoop.hdds.scm.container
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
new file mode 100644
index 0000000..83b9aa3
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
@@ -0,0 +1,625 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
+import org.apache.hadoop.hdds.scm.container.placement.algorithms
+ .ContainerPlacementPolicy;
+import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.server.events.EventHandler;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.hdds.server.events.EventQueue;
+import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.hadoop.hdds.scm.TestUtils.createDatanodeDetails;
+import static org.apache.hadoop.hdds.scm.TestUtils.getContainer;
+import static org.apache.hadoop.hdds.scm.TestUtils.getReplicas;
+import static org.apache.hadoop.hdds.scm.TestUtils.randomDatanodeDetails;
+
+/**
+ * Test cases to verify the functionality of ReplicationManager.
+ */
+public class TestReplicationManager {
+
+ private ReplicationManager replicationManager;
+ private ContainerStateManager containerStateManager;
+ private ContainerPlacementPolicy containerPlacementPolicy;
+ private EventQueue eventQueue;
+ private DatanodeCommandHandler datanodeCommandHandler;
+
+ @Before
+ public void setup() throws IOException, InterruptedException {
+ final Configuration conf = new OzoneConfiguration();
+ final ContainerManager containerManager =
+ Mockito.mock(ContainerManager.class);
+ eventQueue = new EventQueue();
+ containerStateManager = new ContainerStateManager(conf);
+
+ datanodeCommandHandler = new DatanodeCommandHandler();
+ eventQueue.addHandler(SCMEvents.DATANODE_COMMAND, datanodeCommandHandler);
+
+ Mockito.when(containerManager.getContainerIDs())
+ .thenAnswer(invocation -> containerStateManager.getAllContainerIDs());
+
+ Mockito.when(containerManager.getContainer(Mockito.any(ContainerID.class)))
+ .thenAnswer(invocation -> containerStateManager
+ .getContainer((ContainerID)invocation.getArguments()[0]));
+
+ Mockito.when(containerManager.getContainerReplicas(
+ Mockito.any(ContainerID.class)))
+ .thenAnswer(invocation -> containerStateManager
+ .getContainerReplicas((ContainerID)invocation.getArguments()[0]));
+
+ containerPlacementPolicy = Mockito.mock(ContainerPlacementPolicy.class);
+
+ Mockito.when(containerPlacementPolicy.chooseDatanodes(
+ Mockito.anyListOf(DatanodeDetails.class),
+ Mockito.anyInt(), Mockito.anyLong()))
+ .thenAnswer(invocation -> {
+ int count = (int) invocation.getArguments()[1];
+ return IntStream.range(0, count)
+ .mapToObj(i -> randomDatanodeDetails())
+ .collect(Collectors.toList());
+ });
+
+ replicationManager = new ReplicationManager(
+ conf, containerManager, containerPlacementPolicy, eventQueue);
+ replicationManager.start();
+ Thread.sleep(100L);
+ }
+
+ /**
+ * Open containers are not handled by ReplicationManager.
+ * This test-case makes sure that ReplicationManages doesn't take
+ * any action on OPEN containers.
+ */
+ @Test
+ public void testOpenContainer() throws SCMException, InterruptedException {
+ final ContainerInfo container = getContainer(LifeCycleState.OPEN);
+ containerStateManager.loadContainer(container);
+ replicationManager.processContainersNow();
+ // Wait for EventQueue to call the event handler
+ Thread.sleep(100L);
+ Assert.assertEquals(0, datanodeCommandHandler.getInvocation());
+
+ }
+
+ /**
+ * If the container is in CLOSING state we resend close container command
+ * to all the datanodes.
+ */
+ @Test
+ public void testClosingContainer() throws
+ SCMException, ContainerNotFoundException, InterruptedException {
+ final ContainerInfo container = getContainer(LifeCycleState.CLOSING);
+ final ContainerID id = container.containerID();
+
+ containerStateManager.loadContainer(container);
+
+ // Two replicas in CLOSING state
+ final Set<ContainerReplica> replicas = getReplicas(id, State.CLOSING,
+ randomDatanodeDetails(),
+ randomDatanodeDetails());
+
+ // One replica in OPEN state
+ final DatanodeDetails datanode = randomDatanodeDetails();
+ replicas.addAll(getReplicas(id, State.OPEN, datanode));
+
+ for (ContainerReplica replica : replicas) {
+ containerStateManager.updateContainerReplica(id, replica);
+ }
+
+ final int currentCloseCommandCount = datanodeCommandHandler
+ .getInvocationCount(SCMCommandProto.Type.closeContainerCommand);
+
+ replicationManager.processContainersNow();
+ // Wait for EventQueue to call the event handler
+ Thread.sleep(100L);
+ Assert.assertEquals(currentCloseCommandCount + 3, datanodeCommandHandler
+ .getInvocationCount(SCMCommandProto.Type.closeContainerCommand));
+
+ // Update the OPEN to CLOSING
+ for (ContainerReplica replica : getReplicas(id, State.CLOSING, datanode)) {
+ containerStateManager.updateContainerReplica(id, replica);
+ }
+
+ replicationManager.processContainersNow();
+ // Wait for EventQueue to call the event handler
+ Thread.sleep(100L);
+ Assert.assertEquals(currentCloseCommandCount + 6, datanodeCommandHandler
+ .getInvocationCount(SCMCommandProto.Type.closeContainerCommand));
+ }
+
+
+ /**
+ * The container is QUASI_CLOSED but two of the replica is still in
+ * open state. ReplicationManager should resend close command to those
+ * datanodes.
+ */
+ @Test
+ public void testQuasiClosedContainerWithTwoOpenReplica() throws
+ SCMException, ContainerNotFoundException, InterruptedException {
+ final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED);
+ final ContainerID id = container.containerID();
+ final UUID originNodeId = UUID.randomUUID();
+ final ContainerReplica replicaOne = getReplicas(
+ id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
+ final ContainerReplica replicaTwo = getReplicas(
+ id, State.OPEN, 1000L, originNodeId, randomDatanodeDetails());
+ final DatanodeDetails datanodeDetails = randomDatanodeDetails();
+ final ContainerReplica replicaThree = getReplicas(
+ id, State.OPEN, 1000L, datanodeDetails.getUuid(), datanodeDetails);
+
+ containerStateManager.loadContainer(container);
+ containerStateManager.updateContainerReplica(id, replicaOne);
+ containerStateManager.updateContainerReplica(id, replicaTwo);
+ containerStateManager.updateContainerReplica(id, replicaThree);
+
+ final int currentCloseCommandCount = datanodeCommandHandler
+ .getInvocationCount(SCMCommandProto.Type.closeContainerCommand);
+ // Two of the replicas are in OPEN state
+ replicationManager.processContainersNow();
+ // Wait for EventQueue to call the event handler
+ Thread.sleep(100L);
+ Assert.assertEquals(currentCloseCommandCount + 2, datanodeCommandHandler
+ .getInvocationCount(SCMCommandProto.Type.closeContainerCommand));
+ Assert.assertTrue(datanodeCommandHandler.received(
+ SCMCommandProto.Type.closeContainerCommand,
+ replicaTwo.getDatanodeDetails()));
+ Assert.assertTrue(datanodeCommandHandler.received(
+ SCMCommandProto.Type.closeContainerCommand,
+ replicaThree.getDatanodeDetails()));
+ }
+
+ /**
+ * When the container is in QUASI_CLOSED state and all the replicas are
+ * also in QUASI_CLOSED state and doesn't have a quorum to force close
+ * the container, ReplicationManager will not do anything.
+ */
+ @Test
+ public void testHealthyQuasiClosedContainer() throws
+ SCMException, ContainerNotFoundException, InterruptedException {
+ final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED);
+ final ContainerID id = container.containerID();
+ final UUID originNodeId = UUID.randomUUID();
+ final ContainerReplica replicaOne = getReplicas(
+ id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
+ final ContainerReplica replicaTwo = getReplicas(
+ id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
+ final ContainerReplica replicaThree = getReplicas(
+ id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
+
+ containerStateManager.loadContainer(container);
+ containerStateManager.updateContainerReplica(id, replicaOne);
+ containerStateManager.updateContainerReplica(id, replicaTwo);
+ containerStateManager.updateContainerReplica(id, replicaThree);
+
+ // All the QUASI_CLOSED replicas have same originNodeId, so the
+ // container will not be closed. ReplicationManager should take no action.
+ replicationManager.processContainersNow();
+ // Wait for EventQueue to call the event handler
+ Thread.sleep(100L);
+ Assert.assertEquals(0, datanodeCommandHandler.getInvocation());
+ }
+
+ /**
+ * When a container is QUASI_CLOSED and we don't have quorum to force close
+ * the container, the container should have all the replicas in QUASI_CLOSED
+ * state, else ReplicationManager will take action.
+ *
+ * In this test case we make one of the replica unhealthy, replication manager
+ * will send delete container command to the datanode which has the unhealthy
+ * replica.
+ */
+ @Test
+ public void testQuasiClosedContainerWithUnhealthyReplica()
+ throws SCMException, ContainerNotFoundException, InterruptedException,
+ ContainerReplicaNotFoundException {
+ final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED);
+ final ContainerID id = container.containerID();
+ final UUID originNodeId = UUID.randomUUID();
+ final ContainerReplica replicaOne = getReplicas(
+ id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
+ final ContainerReplica replicaTwo = getReplicas(
+ id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
+ final ContainerReplica replicaThree = getReplicas(
+ id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
+
+ containerStateManager.loadContainer(container);
+ containerStateManager.updateContainerReplica(id, replicaOne);
+ containerStateManager.updateContainerReplica(id, replicaTwo);
+ containerStateManager.updateContainerReplica(id, replicaThree);
+
+ final int currentDeleteCommandCount = datanodeCommandHandler
+ .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
+ final int currentReplicateCommandCount = datanodeCommandHandler
+ .getInvocationCount(SCMCommandProto.Type.replicateContainerCommand);
+
+ // All the QUASI_CLOSED replicas have same originNodeId, so the
+ // container will not be closed. ReplicationManager should take no action.
+ replicationManager.processContainersNow();
+ // Wait for EventQueue to call the event handler
+ Thread.sleep(100L);
+ Assert.assertEquals(0, datanodeCommandHandler.getInvocation());
+
+ // Make the first replica unhealthy
+ final ContainerReplica unhealthyReplica = getReplicas(
+ id, State.UNHEALTHY, 1000L, originNodeId,
+ replicaOne.getDatanodeDetails());
+ containerStateManager.updateContainerReplica(id, unhealthyReplica);
+
+ replicationManager.processContainersNow();
+ // Wait for EventQueue to call the event handler
+ Thread.sleep(100L);
+ Assert.assertEquals(currentDeleteCommandCount + 1, datanodeCommandHandler
+ .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
+ Assert.assertTrue(datanodeCommandHandler.received(
+ SCMCommandProto.Type.deleteContainerCommand,
+ replicaOne.getDatanodeDetails()));
+
+ // Now we will delete the unhealthy replica from in-memory.
+ containerStateManager.removeContainerReplica(id, replicaOne);
+
+ // The container is under replicated as unhealthy replica is removed
+ replicationManager.processContainersNow();
+ // Wait for EventQueue to call the event handler
+ Thread.sleep(100L);
+
+ // We should get replicate command
+ Assert.assertEquals(currentReplicateCommandCount + 1,
+ datanodeCommandHandler.getInvocationCount(
+ SCMCommandProto.Type.replicateContainerCommand));
+ }
+
+ /**
+ * When a QUASI_CLOSED container is over replicated, ReplicationManager
+ * deletes the excess replicas.
+ */
+ @Test
+ public void testOverReplicatedQuasiClosedContainer() throws
+ SCMException, ContainerNotFoundException, InterruptedException {
+ final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED);
+ final ContainerID id = container.containerID();
+ final UUID originNodeId = UUID.randomUUID();
+ final ContainerReplica replicaOne = getReplicas(
+ id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
+ final ContainerReplica replicaTwo = getReplicas(
+ id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
+ final ContainerReplica replicaThree = getReplicas(
+ id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
+ final ContainerReplica replicaFour = getReplicas(
+ id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
+
+ containerStateManager.loadContainer(container);
+ containerStateManager.updateContainerReplica(id, replicaOne);
+ containerStateManager.updateContainerReplica(id, replicaTwo);
+ containerStateManager.updateContainerReplica(id, replicaThree);
+ containerStateManager.updateContainerReplica(id, replicaFour);
+
+ final int currentDeleteCommandCount = datanodeCommandHandler
+ .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
+
+ replicationManager.processContainersNow();
+ // Wait for EventQueue to call the event handler
+ Thread.sleep(100L);
+ Assert.assertEquals(currentDeleteCommandCount + 1, datanodeCommandHandler
+ .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
+ }
+
+ /**
+ * When a QUASI_CLOSED container is over replicated, ReplicationManager
+ * deletes the excess replicas. While choosing the replica for deletion
+ * ReplicationManager should prioritize unhealthy replica over QUASI_CLOSED
+ * replica.
+ */
+ @Test
+ public void testOverReplicatedQuasiClosedContainerWithUnhealthyReplica()
+ throws SCMException, ContainerNotFoundException, InterruptedException {
+ final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED);
+ final ContainerID id = container.containerID();
+ final UUID originNodeId = UUID.randomUUID();
+ final ContainerReplica replicaOne = getReplicas(
+ id, State.UNHEALTHY, 1000L, originNodeId, randomDatanodeDetails());
+ final ContainerReplica replicaTwo = getReplicas(
+ id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
+ final ContainerReplica replicaThree = getReplicas(
+ id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
+ final ContainerReplica replicaFour = getReplicas(
+ id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
+
+ containerStateManager.loadContainer(container);
+ containerStateManager.updateContainerReplica(id, replicaOne);
+ containerStateManager.updateContainerReplica(id, replicaTwo);
+ containerStateManager.updateContainerReplica(id, replicaThree);
+ containerStateManager.updateContainerReplica(id, replicaFour);
+
+ final int currentDeleteCommandCount = datanodeCommandHandler
+ .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
+
+ replicationManager.processContainersNow();
+ // Wait for EventQueue to call the event handler
+ Thread.sleep(100L);
+ Assert.assertEquals(currentDeleteCommandCount + 1, datanodeCommandHandler
+ .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
+ Assert.assertTrue(datanodeCommandHandler.received(
+ SCMCommandProto.Type.deleteContainerCommand,
+ replicaOne.getDatanodeDetails()));
+ }
+
+ /**
+ * ReplicationManager should replicate an QUASI_CLOSED replica if it is
+ * under replicated.
+ */
+ @Test
+ public void testUnderReplicatedQuasiClosedContainer() throws
+ SCMException, ContainerNotFoundException, InterruptedException {
+ final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED);
+ final ContainerID id = container.containerID();
+ final UUID originNodeId = UUID.randomUUID();
+ final ContainerReplica replicaOne = getReplicas(
+ id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
+ final ContainerReplica replicaTwo = getReplicas(
+ id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
+
+ containerStateManager.loadContainer(container);
+ containerStateManager.updateContainerReplica(id, replicaOne);
+ containerStateManager.updateContainerReplica(id, replicaTwo);
+
+ final int currentReplicateCommandCount = datanodeCommandHandler
+ .getInvocationCount(SCMCommandProto.Type.replicateContainerCommand);
+
+ replicationManager.processContainersNow();
+ // Wait for EventQueue to call the event handler
+ Thread.sleep(100L);
+ Assert.assertEquals(currentReplicateCommandCount + 1,
+ datanodeCommandHandler.getInvocationCount(
+ SCMCommandProto.Type.replicateContainerCommand));
+ }
+
+ /**
+ * When a QUASI_CLOSED container is under replicated, ReplicationManager
+ * should re-replicate it. If there are any unhealthy replica, it has to
+ * be deleted.
+ *
+ * In this test case, the container is QUASI_CLOSED and is under replicated
+ * and also has an unhealthy replica.
+ *
+ * In the first iteration of ReplicationManager, it should re-replicate
+ * the container so that it has enough replicas.
+ *
+ * In the second iteration, ReplicationManager should delete the unhealthy
+ * replica.
+ *
+ * In the third iteration, ReplicationManager will re-replicate as the
+ * container has again become under replicated after the unhealthy
+ * replica has been deleted.
+ *
+ */
+ @Test
+ public void testUnderReplicatedQuasiClosedContainerWithUnhealthyReplica()
+ throws SCMException, ContainerNotFoundException, InterruptedException,
+ ContainerReplicaNotFoundException {
+ final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED);
+ final ContainerID id = container.containerID();
+ final UUID originNodeId = UUID.randomUUID();
+ final ContainerReplica replicaOne = getReplicas(
+ id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
+ final ContainerReplica replicaTwo = getReplicas(
+ id, State.UNHEALTHY, 1000L, originNodeId, randomDatanodeDetails());
+
+ containerStateManager.loadContainer(container);
+ containerStateManager.updateContainerReplica(id, replicaOne);
+ containerStateManager.updateContainerReplica(id, replicaTwo);
+
+ final int currentReplicateCommandCount = datanodeCommandHandler
+ .getInvocationCount(SCMCommandProto.Type.replicateContainerCommand);
+ final int currentDeleteCommandCount = datanodeCommandHandler
+ .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
+
+ replicationManager.processContainersNow();
+ // Wait for EventQueue to call the event handler
+ Thread.sleep(100L);
+ Assert.assertEquals(currentReplicateCommandCount + 1,
+ datanodeCommandHandler.getInvocationCount(
+ SCMCommandProto.Type.replicateContainerCommand));
+
+ Optional<CommandForDatanode> replicateCommand = datanodeCommandHandler
+ .getReceivedCommands().stream()
+ .filter(c -> c.getCommand().getType()
+ .equals(SCMCommandProto.Type.replicateContainerCommand))
+ .findFirst();
+
+ Assert.assertTrue(replicateCommand.isPresent());
+
+ DatanodeDetails newNode = createDatanodeDetails(
+ replicateCommand.get().getDatanodeId());
+ ContainerReplica newReplica = getReplicas(
+ id, State.QUASI_CLOSED, 1000L, originNodeId, newNode);
+ containerStateManager.updateContainerReplica(id, newReplica);
+
+ /*
+ * We have report the replica to SCM, in the next ReplicationManager
+ * iteration it should delete the unhealthy replica.
+ */
+
+ replicationManager.processContainersNow();
+ // Wait for EventQueue to call the event handler
+ Thread.sleep(100L);
+ Assert.assertEquals(currentDeleteCommandCount + 1, datanodeCommandHandler
+ .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
+ // ReplicaTwo should be deleted, that is the unhealthy one
+ Assert.assertTrue(datanodeCommandHandler.received(
+ SCMCommandProto.Type.deleteContainerCommand,
+ replicaTwo.getDatanodeDetails()));
+
+ containerStateManager.removeContainerReplica(id, replicaTwo);
+
+ /*
+ * We have now removed unhealthy replica, next iteration of
+ * ReplicationManager should re-replicate the container as it
+ * is under replicated now
+ */
+
+ replicationManager.processContainersNow();
+ // Wait for EventQueue to call the event handler
+ Thread.sleep(100L);
+ Assert.assertEquals(currentReplicateCommandCount + 2,
+ datanodeCommandHandler.getInvocationCount(
+ SCMCommandProto.Type.replicateContainerCommand));
+ }
+
+
+ /**
+ * When a container is QUASI_CLOSED and it has >50% of its replica
+ * in QUASI_CLOSED state with unique origin node id,
+ * ReplicationManager should force close the replica(s) with
+ * highest BCSID.
+ */
+ @Test
+ public void testQuasiClosedToClosed() throws
+ SCMException, ContainerNotFoundException, InterruptedException {
+ final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED);
+ final ContainerID id = container.containerID();
+ final Set<ContainerReplica> replicas = getReplicas(id, State.QUASI_CLOSED,
+ randomDatanodeDetails(),
+ randomDatanodeDetails(),
+ randomDatanodeDetails());
+ containerStateManager.loadContainer(container);
+ for (ContainerReplica replica : replicas) {
+ containerStateManager.updateContainerReplica(id, replica);
+ }
+
+ final int currentCloseCommandCount = datanodeCommandHandler
+ .getInvocationCount(SCMCommandProto.Type.closeContainerCommand);
+
+ replicationManager.processContainersNow();
+ // Wait for EventQueue to call the event handler
+ Thread.sleep(100L);
+
+ // All the replicas have same BCSID, so all of them will be closed.
+ Assert.assertEquals(currentCloseCommandCount + 3, datanodeCommandHandler
+ .getInvocationCount(SCMCommandProto.Type.closeContainerCommand));
+
+ }
+
+
+ /**
+ * ReplicationManager should not take any action if the container is
+ * CLOSED and healthy.
+ */
+ @Test
+ public void testHealthyClosedContainer()
+ throws SCMException, ContainerNotFoundException, InterruptedException {
+ final ContainerInfo container = getContainer(LifeCycleState.CLOSED);
+ final ContainerID id = container.containerID();
+ final Set<ContainerReplica> replicas = getReplicas(id, State.CLOSED,
+ randomDatanodeDetails(),
+ randomDatanodeDetails(),
+ randomDatanodeDetails());
+
+ containerStateManager.loadContainer(container);
+ for (ContainerReplica replica : replicas) {
+ containerStateManager.updateContainerReplica(id, replica);
+ }
+
+ replicationManager.processContainersNow();
+ // Wait for EventQueue to call the event handler
+ Thread.sleep(100L);
+ Assert.assertEquals(0, datanodeCommandHandler.getInvocation());
+ }
+
+ @After
+ public void teardown() throws IOException {
+ containerStateManager.close();
+ replicationManager.stop();
+ }
+
+ private class DatanodeCommandHandler implements
+ EventHandler<CommandForDatanode> {
+
+ private AtomicInteger invocation = new AtomicInteger(0);
+ private Map<SCMCommandProto.Type, AtomicInteger> commandInvocation =
+ new HashMap<>();
+ private List<CommandForDatanode> commands = new ArrayList<>();
+
+ @Override
+ public void onMessage(final CommandForDatanode command,
+ final EventPublisher publisher) {
+ final SCMCommandProto.Type type = command.getCommand().getType();
+ commandInvocation.computeIfAbsent(type, k -> new AtomicInteger(0));
+ commandInvocation.get(type).incrementAndGet();
+ invocation.incrementAndGet();
+ commands.add(command);
+ }
+
+ private int getInvocation() {
+ return invocation.get();
+ }
+
+ private int getInvocationCount(SCMCommandProto.Type type) {
+ return commandInvocation.containsKey(type) ?
+ commandInvocation.get(type).get() : 0;
+ }
+
+ private List<CommandForDatanode> getReceivedCommands() {
+ return commands;
+ }
+
+ /**
+ * Returns true if the command handler has received the given
+ * command type for the provided datanode.
+ *
+ * @param type Command Type
+ * @param datanode DatanodeDetails
+ * @return True if command was received, false otherwise
+ */
+ private boolean received(final SCMCommandProto.Type type,
+ final DatanodeDetails datanode) {
+ return commands.stream().anyMatch(dc ->
+ dc.getCommand().getType().equals(type) &&
+ dc.getDatanodeId().equals(datanode.getUuid()));
+ }
+ }
+
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org