You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by na...@apache.org on 2019/04/04 11:03:05 UTC
[hadoop] branch trunk updated: HDDS-1207. Refactor Container Report
Processing logic and plugin new Replication Manager. (#662)
This is an automated email from the ASF dual-hosted git repository.
nanda pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 48a58bc HDDS-1207. Refactor Container Report Processing logic and plugin new Replication Manager. (#662)
48a58bc is described below
commit 48a58bce37dfddf37a4a6888228f7c7fc80bccdd
Author: Nanda kumar <na...@gmail.com>
AuthorDate: Thu Apr 4 16:32:59 2019 +0530
HDDS-1207. Refactor Container Report Processing logic and plugin new Replication Manager. (#662)
---
.../hadoop/hdds/scm/container/ContainerInfo.java | 7 -
.../hdds/scm/chillmode/ChillModeHandler.java | 36 +-
.../container/AbstractContainerReportHandler.java | 236 +++++++++
.../hdds/scm/container/ContainerReportHandler.java | 234 ++++-----
.../hdds/scm/container/ContainerStateManager.java | 19 +-
.../IncrementalContainerReportHandler.java | 47 +-
.../hdds/scm/container/ReplicationManager.java | 9 +
.../hdds/scm/container/ReportHandlerHelper.java | 365 -------------
.../hdds/scm/container/SCMContainerManager.java | 23 +-
.../scm/container/states/ContainerStateMap.java | 8 +-
.../hadoop/hdds/scm/server/SCMConfigurator.java | 2 +-
.../hdds/scm/server/StorageContainerManager.java | 37 +-
.../java/org/apache/hadoop/hdds/scm/TestUtils.java | 1 +
.../hdds/scm/chillmode/TestChillModeHandler.java | 28 +-
.../scm/container/TestContainerReportHandler.java | 585 ++++++++-------------
.../scm/container/TestContainerReportHelper.java | 73 ---
.../TestIncrementalContainerReportHandler.java | 158 +++---
.../replication/TestReplicationActivityStatus.java | 85 ---
.../scm/server/TestSCMClientProtocolServer.java | 8 +-
19 files changed, 723 insertions(+), 1238 deletions(-)
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerInfo.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerInfo.java
index 05d4e77..7b5c467 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerInfo.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerInfo.java
@@ -108,13 +108,6 @@ public class ContainerInfo implements Comparator<ContainerInfo>,
this.replicationType = repType;
}
- public ContainerInfo(ContainerInfo info) {
- this(info.getContainerID(), info.getState(), info.getPipelineID(),
- info.getUsedBytes(), info.getNumberOfKeys(),
- info.getStateEnterTime(), info.getOwner(),
- info.getDeleteTransactionId(), info.getSequenceId(),
- info.getReplicationFactor(), info.getReplicationType());
- }
/**
* Needed for serialization findbugs.
*/
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/chillmode/ChillModeHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/chillmode/ChillModeHandler.java
index 95e0d93..fff1fb2 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/chillmode/ChillModeHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/chillmode/ChillModeHandler.java
@@ -20,8 +20,7 @@ package org.apache.hadoop.hdds.scm.chillmode;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.scm.block.BlockManager;
-import org.apache.hadoop.hdds.scm.container.replication.
- ReplicationActivityStatus;
+import org.apache.hadoop.hdds.scm.container.ReplicationManager;
import org.apache.hadoop.hdds.scm.server.SCMClientProtocolServer;
import org.apache.hadoop.hdds.scm.chillmode.SCMChillModeManager.ChillModeStatus;
import org.apache.hadoop.hdds.server.events.EventHandler;
@@ -41,7 +40,7 @@ public class ChillModeHandler implements EventHandler<ChillModeStatus> {
private final BlockManager scmBlockManager;
private final long waitTime;
private final AtomicBoolean isInChillMode = new AtomicBoolean(true);
- private final ReplicationActivityStatus replicationActivityStatus;
+ private final ReplicationManager replicationManager;
/**
@@ -49,27 +48,27 @@ public class ChillModeHandler implements EventHandler<ChillModeStatus> {
* @param configuration
* @param clientProtocolServer
* @param blockManager
- * @param replicationStatus
+ * @param replicationManager
*/
public ChillModeHandler(Configuration configuration,
SCMClientProtocolServer clientProtocolServer,
BlockManager blockManager,
- ReplicationActivityStatus replicationStatus) {
+ ReplicationManager replicationManager) {
Objects.requireNonNull(configuration, "Configuration cannot be null");
Objects.requireNonNull(clientProtocolServer, "SCMClientProtocolServer " +
"object cannot be null");
Objects.requireNonNull(blockManager, "BlockManager object cannot be null");
- Objects.requireNonNull(replicationStatus, "ReplicationActivityStatus " +
+ Objects.requireNonNull(replicationManager, "ReplicationManager " +
"object cannot be null");
this.waitTime = configuration.getTimeDuration(
HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_CHILL_MODE_EXIT,
HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_CHILL_MODE_EXIT_DEFAULT,
TimeUnit.MILLISECONDS);
- scmClientProtocolServer = clientProtocolServer;
- scmBlockManager = blockManager;
- replicationActivityStatus = replicationStatus;
+ this.scmClientProtocolServer = clientProtocolServer;
+ this.scmBlockManager = blockManager;
+ this.replicationManager = replicationManager;
- boolean chillModeEnabled = configuration.getBoolean(
+ final boolean chillModeEnabled = configuration.getBoolean(
HddsConfigKeys.HDDS_SCM_CHILLMODE_ENABLED,
HddsConfigKeys.HDDS_SCM_CHILLMODE_ENABLED_DEFAULT);
isInChillMode.set(chillModeEnabled);
@@ -89,13 +88,16 @@ public class ChillModeHandler implements EventHandler<ChillModeStatus> {
@Override
public void onMessage(ChillModeStatus chillModeStatus,
EventPublisher publisher) {
- isInChillMode.set(chillModeStatus.getChillModeStatus());
-
- replicationActivityStatus.fireReplicationStart(isInChillMode.get(),
- waitTime);
- scmClientProtocolServer.setChillModeStatus(isInChillMode.get());
- scmBlockManager.setChillModeStatus(isInChillMode.get());
-
+ try {
+ isInChillMode.set(chillModeStatus.getChillModeStatus());
+ scmClientProtocolServer.setChillModeStatus(isInChillMode.get());
+ scmBlockManager.setChillModeStatus(isInChillMode.get());
+ Thread.sleep(waitTime);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } finally {
+ replicationManager.start();
+ }
}
public boolean getChillModeStatus() {
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java
new file mode 100644
index 0000000..f660442
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java
@@ -0,0 +1,236 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.util.UUID;
+import java.util.function.Supplier;
+
+/**
+ * Base class for all the container report handlers.
+ */
+public class AbstractContainerReportHandler {
+
+ private final ContainerManager containerManager;
+ private final Logger logger;
+
+ /**
+ * Constructs AbstractContainerReportHandler instance with the
+ * given ContainerManager instance.
+ *
+ * @param containerManager ContainerManager
+ * @param logger Logger to be used for logging
+ */
+ AbstractContainerReportHandler(final ContainerManager containerManager,
+ final Logger logger) {
+ Preconditions.checkNotNull(containerManager);
+ Preconditions.checkNotNull(logger);
+ this.containerManager = containerManager;
+ this.logger = logger;
+ }
+
+ /**
+ * Process the given ContainerReplica received from specified datanode.
+ *
+ * @param datanodeDetails DatanodeDetails of the node which reported
+ * this replica
+ * @param replicaProto ContainerReplica
+ *
+ * @throws IOException In case of any Exception while processing the report
+ */
+ void processContainerReplica(final DatanodeDetails datanodeDetails,
+ final ContainerReplicaProto replicaProto)
+ throws IOException {
+ final ContainerID containerId = ContainerID
+ .valueof(replicaProto.getContainerID());
+ final ContainerReplica replica = ContainerReplica.newBuilder()
+ .setContainerID(containerId)
+ .setContainerState(replicaProto.getState())
+ .setDatanodeDetails(datanodeDetails)
+ .setOriginNodeId(UUID.fromString(replicaProto.getOriginNodeId()))
+ .setSequenceId(replicaProto.getBlockCommitSequenceId())
+ .build();
+
+ logger.debug("Processing replica of container {} from datanode {}",
+ containerId, datanodeDetails);
+ // Synchronized block should be replaced by container lock,
+ // once we have introduced lock inside ContainerInfo.
+ synchronized (containerManager.getContainer(containerId)) {
+ updateContainerStats(containerId, replicaProto);
+ updateContainerState(datanodeDetails, containerId, replica);
+ containerManager.updateContainerReplica(containerId, replica);
+ }
+ }
+
+ /**
+ * Update the container stats if it's lagging behind the stats in reported
+ * replica.
+ *
+ * @param containerId ID of the container
+ * @param replicaProto Container Replica information
+ * @throws ContainerNotFoundException If the container is not present
+ */
+ private void updateContainerStats(final ContainerID containerId,
+ final ContainerReplicaProto replicaProto)
+ throws ContainerNotFoundException {
+
+ if (!isUnhealthy(replicaProto::getState)) {
+ final ContainerInfo containerInfo = containerManager
+ .getContainer(containerId);
+
+ if (containerInfo.getSequenceId() <
+ replicaProto.getBlockCommitSequenceId()) {
+ containerInfo.updateSequenceId(
+ replicaProto.getBlockCommitSequenceId());
+ }
+ if (containerInfo.getUsedBytes() < replicaProto.getUsed()) {
+ containerInfo.setUsedBytes(replicaProto.getUsed());
+ }
+ if (containerInfo.getNumberOfKeys() < replicaProto.getKeyCount()) {
+ containerInfo.setNumberOfKeys(replicaProto.getKeyCount());
+ }
+ }
+ }
+
+ /**
+ * Updates the container state based on the given replica state.
+ *
+ * @param datanode Datanode from which the report is received
+ * @param containerId ID of the container
+ * @param replica ContainerReplica
+ * @throws IOException In case of Exception
+ */
+ private void updateContainerState(final DatanodeDetails datanode,
+ final ContainerID containerId,
+ final ContainerReplica replica)
+ throws IOException {
+
+ final ContainerInfo container = containerManager
+ .getContainer(containerId);
+
+ switch (container.getState()) {
+ case OPEN:
+ /*
+ * If the state of a container is OPEN, datanodes cannot report
+ * any other state.
+ */
+ if (replica.getState() != State.OPEN) {
+ logger.warn("Container {} is in OPEN state, but the datanode {} " +
+ "reports an {} replica.", containerId,
+ datanode, replica.getState());
+ // Should we take some action?
+ }
+ break;
+ case CLOSING:
+ /*
+ * When the container is in CLOSING state the replicas can be in any
+ * of the following states:
+ *
+ * - OPEN
+ * - CLOSING
+ * - QUASI_CLOSED
+ * - CLOSED
+ *
+ * If all the replica are either in OPEN or CLOSING state, do nothing.
+ *
+ * If the replica is in QUASI_CLOSED state, move the container to
+ * QUASI_CLOSED state.
+ *
+ * If the replica is in CLOSED state, mark the container as CLOSED.
+ *
+ */
+
+ if (replica.getState() == State.QUASI_CLOSED) {
+ logger.info("Moving container {} to QUASI_CLOSED state, datanode {} " +
+ "reported QUASI_CLOSED replica.", containerId, datanode);
+ containerManager.updateContainerState(containerId,
+ LifeCycleEvent.QUASI_CLOSE);
+ }
+
+ if (replica.getState() == State.CLOSED) {
+ logger.info("Moving container {} to CLOSED state, datanode {} " +
+ "reported CLOSED replica.", containerId, datanode);
+ Preconditions.checkArgument(replica.getSequenceId()
+ == container.getSequenceId());
+ containerManager.updateContainerState(containerId,
+ LifeCycleEvent.CLOSE);
+ }
+
+ break;
+ case QUASI_CLOSED:
+ /*
+ * The container is in QUASI_CLOSED state, this means that at least
+ * one of the replica was QUASI_CLOSED.
+ *
+ * Now replicas can be in any of the following state.
+ *
+ * 1. OPEN
+ * 2. CLOSING
+ * 3. QUASI_CLOSED
+ * 4. CLOSED
+ *
+ * If at least one of the replica is in CLOSED state, mark the
+ * container as CLOSED.
+ *
+ */
+ if (replica.getState() == State.CLOSED) {
+ logger.info("Moving container {} to CLOSED state, datanode {} " +
+ "reported CLOSED replica.", containerId, datanode);
+ Preconditions.checkArgument(replica.getSequenceId()
+ == container.getSequenceId());
+ containerManager.updateContainerState(containerId,
+ LifeCycleEvent.FORCE_CLOSE);
+ }
+ break;
+ case CLOSED:
+ /*
+ * The container is already in closed state. do nothing.
+ */
+ break;
+ case DELETING:
+ throw new UnsupportedOperationException(
+ "Unsupported container state 'DELETING'.");
+ case DELETED:
+ throw new UnsupportedOperationException(
+ "Unsupported container state 'DELETED'.");
+ default:
+ break;
+ }
+ }
+
+ /**
+ * Returns true if the container replica is not marked as UNHEALTHY.
+ *
+ * @param replicaState State of the container replica.
+ * @return true if unhealthy, false otherwise
+ */
+ private boolean isUnhealthy(final Supplier<State> replicaState) {
+ return replicaState.get() == ContainerReplicaProto.State.UNHEALTHY;
+ }
+
+}
\ No newline at end of file
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java
index 4500786..934b244 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java
@@ -15,13 +15,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.hdds.scm.container;
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.stream.Collectors;
+package org.apache.hadoop.hdds.scm.container;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto
@@ -29,115 +24,85 @@ import org.apache.hadoop.hdds.protocol.proto
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.hdds.scm.block.PendingDeleteStatusList;
-import org.apache.hadoop.hdds.scm.container.replication
- .ReplicationActivityStatus;
-import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
-import org.apache.hadoop.hdds.scm.server
- .SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
+ .ContainerReportFromDatanode;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
-
-import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
/**
* Handles container reports from datanode.
*/
-public class ContainerReportHandler implements
- EventHandler<ContainerReportFromDatanode> {
+public class ContainerReportHandler extends AbstractContainerReportHandler
+ implements EventHandler<ContainerReportFromDatanode> {
private static final Logger LOG =
LoggerFactory.getLogger(ContainerReportHandler.class);
private final NodeManager nodeManager;
- private final PipelineManager pipelineManager;
private final ContainerManager containerManager;
- private final ReplicationActivityStatus replicationStatus;
+ /**
+ * Constructs ContainerReportHandler instance with the
+ * given NodeManager and ContainerManager instance.
+ *
+ * @param nodeManager NodeManager instance
+ * @param containerManager ContainerManager instance
+ */
public ContainerReportHandler(final NodeManager nodeManager,
- final PipelineManager pipelineManager,
- final ContainerManager containerManager,
- final ReplicationActivityStatus replicationActivityStatus) {
- Preconditions.checkNotNull(nodeManager);
- Preconditions.checkNotNull(pipelineManager);
- Preconditions.checkNotNull(containerManager);
- Preconditions.checkNotNull(replicationActivityStatus);
+ final ContainerManager containerManager) {
+ super(containerManager, LOG);
this.nodeManager = nodeManager;
- this.pipelineManager = pipelineManager;
this.containerManager = containerManager;
- this.replicationStatus = replicationActivityStatus;
}
+ /**
+ * Process the container reports from datanodes.
+ *
+ * @param reportFromDatanode Container Report
+ * @param publisher EventPublisher reference
+ */
@Override
public void onMessage(final ContainerReportFromDatanode reportFromDatanode,
- final EventPublisher publisher) {
+ final EventPublisher publisher) {
final DatanodeDetails datanodeDetails =
reportFromDatanode.getDatanodeDetails();
-
final ContainerReportsProto containerReport =
reportFromDatanode.getReport();
try {
+ final List<ContainerReplicaProto> replicas =
+ containerReport.getReportsList();
+ final Set<ContainerID> containersInSCM =
+ nodeManager.getContainers(datanodeDetails);
- final List<ContainerReplicaProto> replicas = containerReport
- .getReportsList();
-
- // ContainerIDs which SCM expects this datanode to have.
- final Set<ContainerID> expectedContainerIDs = nodeManager
- .getContainers(datanodeDetails);
-
- // ContainerIDs that this datanode actually has.
- final Set<ContainerID> actualContainerIDs = replicas.parallelStream()
+ final Set<ContainerID> containersInDn = replicas.parallelStream()
.map(ContainerReplicaProto::getContainerID)
.map(ContainerID::valueof).collect(Collectors.toSet());
- // Container replicas which SCM is not aware of.
- final Set<ContainerID> newReplicas =
- new HashSet<>(actualContainerIDs);
- newReplicas.removeAll(expectedContainerIDs);
-
- // Container replicas which are missing from datanode.
- final Set<ContainerID> missingReplicas =
- new HashSet<>(expectedContainerIDs);
- missingReplicas.removeAll(actualContainerIDs);
-
- processContainerReplicas(datanodeDetails, replicas, publisher);
-
- // Remove missing replica from ContainerManager
- for (ContainerID id : missingReplicas) {
- try {
- containerManager.getContainerReplicas(id)
- .stream()
- .filter(replica ->
- replica.getDatanodeDetails().equals(datanodeDetails))
- .findFirst()
- .ifPresent(replica -> {
- try {
- containerManager.removeContainerReplica(id, replica);
- } catch (ContainerNotFoundException |
- ContainerReplicaNotFoundException e) {
- // This should not happen, but even if it happens, not an
- // issue
- }
- });
- } catch (ContainerNotFoundException e) {
- LOG.warn("Cannot remove container replica, container {} not found {}",
- id, e);
- }
- }
+ final Set<ContainerID> missingReplicas = new HashSet<>(containersInSCM);
+ missingReplicas.removeAll(containersInDn);
- // Update the latest set of containers for this datanode in NodeManager.
- nodeManager.setContainers(datanodeDetails, actualContainerIDs);
+ processContainerReplicas(datanodeDetails, replicas);
+ processMissingReplicas(datanodeDetails, missingReplicas);
+ updateDeleteTransaction(datanodeDetails, replicas, publisher);
- // Replicate if needed.
- newReplicas.forEach(id -> checkReplicationState(id, publisher));
- missingReplicas.forEach(id -> checkReplicationState(id, publisher));
+ /*
+ * Update the latest set of containers for this datanode in
+ * NodeManager
+ */
+ nodeManager.setContainers(datanodeDetails, containersInDn);
} catch (NodeNotFoundException ex) {
LOG.error("Received container report from unknown datanode {} {}",
@@ -146,68 +111,89 @@ public class ContainerReportHandler implements
}
+ /**
+ * Processes the ContainerReport.
+ *
+ * @param datanodeDetails Datanode from which this report was received
+ * @param replicas list of ContainerReplicaProto
+ */
private void processContainerReplicas(final DatanodeDetails datanodeDetails,
- final List<ContainerReplicaProto> replicas,
- final EventPublisher publisher) {
- final PendingDeleteStatusList pendingDeleteStatusList =
- new PendingDeleteStatusList(datanodeDetails);
+ final List<ContainerReplicaProto> replicas) {
for (ContainerReplicaProto replicaProto : replicas) {
try {
- final ContainerID containerID = ContainerID.valueof(
- replicaProto.getContainerID());
-
- ReportHandlerHelper.processContainerReplica(containerManager,
- containerID, replicaProto, datanodeDetails, publisher, LOG);
-
- final ContainerInfo containerInfo = containerManager
- .getContainer(containerID);
-
- if (containerInfo.getDeleteTransactionId() >
- replicaProto.getDeleteTransactionId()) {
- pendingDeleteStatusList
- .addPendingDeleteStatus(replicaProto.getDeleteTransactionId(),
- containerInfo.getDeleteTransactionId(),
- containerInfo.getContainerID());
- }
+ processContainerReplica(datanodeDetails, replicaProto);
} catch (ContainerNotFoundException e) {
- LOG.error("Received container report for an unknown container {} from"
- + " datanode {} {}", replicaProto.getContainerID(),
+ LOG.error("Received container report for an unknown container" +
+ " {} from datanode {}.", replicaProto.getContainerID(),
datanodeDetails, e);
} catch (IOException e) {
- LOG.error("Exception while processing container report for container"
- + " {} from datanode {} {}", replicaProto.getContainerID(),
+ LOG.error("Exception while processing container report for container" +
+ " {} from datanode {}.", replicaProto.getContainerID(),
datanodeDetails, e);
}
}
- if (pendingDeleteStatusList.getNumPendingDeletes() > 0) {
- publisher.fireEvent(SCMEvents.PENDING_DELETE_STATUS,
- pendingDeleteStatusList);
- }
}
- private void checkReplicationState(ContainerID containerID,
- EventPublisher publisher) {
- try {
- ContainerInfo container = containerManager.getContainer(containerID);
- replicateIfNeeded(container, publisher);
- } catch (ContainerNotFoundException ex) {
- LOG.warn("Container is missing from containerStateManager. Can't request "
- + "replication. {} {}", containerID, ex);
+ /**
+ * Process the missing replica on the given datanode.
+ *
+ * @param datanodeDetails DatanodeDetails
+ * @param missingReplicas ContainerID which are missing on the given datanode
+ */
+ private void processMissingReplicas(final DatanodeDetails datanodeDetails,
+ final Set<ContainerID> missingReplicas) {
+ for (ContainerID id : missingReplicas) {
+ try {
+ containerManager.getContainerReplicas(id).stream()
+ .filter(replica -> replica.getDatanodeDetails()
+ .equals(datanodeDetails)).findFirst()
+ .ifPresent(replica -> {
+ try {
+ containerManager.removeContainerReplica(id, replica);
+ } catch (ContainerNotFoundException |
+ ContainerReplicaNotFoundException ignored) {
+ // This should not happen, but even if it happens, not an issue
+ }
+ });
+ } catch (ContainerNotFoundException e) {
+ LOG.warn("Cannot remove container replica, container {} not found.",
+ id, e);
+ }
}
-
}
- private void replicateIfNeeded(ContainerInfo container,
- EventPublisher publisher) throws ContainerNotFoundException {
- if (!container.isOpen() && replicationStatus.isReplicationEnabled()) {
- final int existingReplicas = containerManager
- .getContainerReplicas(container.containerID()).size();
- final int expectedReplicas = container.getReplicationFactor().getNumber();
- if (existingReplicas != expectedReplicas) {
- publisher.fireEvent(SCMEvents.REPLICATE_CONTAINER,
- new ReplicationRequest(container.getContainerID(),
- existingReplicas, expectedReplicas));
+ /**
+ * Updates the Delete Transaction Id for the given datanode.
+ *
+ * @param datanodeDetails DatanodeDetails
+ * @param replicas List of ContainerReplicaProto
+ * @param publisher EventPublisher reference
+ */
+ private void updateDeleteTransaction(final DatanodeDetails datanodeDetails,
+ final List<ContainerReplicaProto> replicas,
+ final EventPublisher publisher) {
+ final PendingDeleteStatusList pendingDeleteStatusList =
+ new PendingDeleteStatusList(datanodeDetails);
+ for (ContainerReplicaProto replica : replicas) {
+ try {
+ final ContainerInfo containerInfo = containerManager.getContainer(
+ ContainerID.valueof(replica.getContainerID()));
+ if (containerInfo.getDeleteTransactionId() >
+ replica.getDeleteTransactionId()) {
+ pendingDeleteStatusList.addPendingDeleteStatus(
+ replica.getDeleteTransactionId(),
+ containerInfo.getDeleteTransactionId(),
+ containerInfo.getContainerID());
+ }
+ } catch (ContainerNotFoundException cnfe) {
+ LOG.warn("Cannot update pending delete transaction for " +
+ "container #{}. Reason: container missing.",
+ replica.getContainerID());
}
}
+ if (pendingDeleteStatusList.getNumPendingDeletes() > 0) {
+ publisher.fireEvent(SCMEvents.PENDING_DELETE_STATUS,
+ pendingDeleteStatusList);
+ }
}
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
index 4af8678..a37bf33 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
@@ -310,20 +310,19 @@ public class ContainerStateManager {
*
* @param containerID - ContainerID
* @param event - LifeCycle Event
- * @return Updated ContainerInfo.
* @throws SCMException on Failure.
*/
- ContainerInfo updateContainerState(final ContainerID containerID,
+ void updateContainerState(final ContainerID containerID,
final HddsProtos.LifeCycleEvent event)
throws SCMException, ContainerNotFoundException {
final ContainerInfo info = containers.getContainerInfo(containerID);
try {
+ final LifeCycleState oldState = info.getState();
final LifeCycleState newState = stateMachine.getNextState(
info.getState(), event);
containers.updateState(containerID, info.getState(), newState);
containerStateCount.incrementAndGet(newState);
- containerStateCount.decrementAndGet(info.getState());
- return containers.getContainerInfo(containerID);
+ containerStateCount.decrementAndGet(oldState);
} catch (InvalidStateTransitionException ex) {
String error = String.format("Failed to update container state %s, " +
"reason: invalid state transition from state: %s upon " +
@@ -335,18 +334,6 @@ public class ContainerStateManager {
}
/**
- * Update the container State.
- * @param info - Container Info
- * @return ContainerInfo
- * @throws SCMException - on Error.
- */
- ContainerInfo updateContainerInfo(final ContainerInfo info)
- throws ContainerNotFoundException {
- containers.updateContainerInfo(info);
- return containers.getContainerInfo(info.containerID());
- }
-
- /**
* Update deleteTransactionId for a container.
*
* @param deleteTransactionMap maps containerId to its new
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/IncrementalContainerReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/IncrementalContainerReportHandler.java
index d70edfb..042fd56 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/IncrementalContainerReportHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/IncrementalContainerReportHandler.java
@@ -18,55 +18,40 @@
package org.apache.hadoop.hdds.scm.container;
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
-import org.apache.hadoop.hdds.scm.server
- .SCMDatanodeHeartbeatDispatcher.IncrementalContainerReportFromDatanode;
+import java.io.IOException;
+
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos
+ .ContainerReplicaProto;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
+ .IncrementalContainerReportFromDatanode;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-
/**
* Handles incremental container reports from datanode.
*/
-public class IncrementalContainerReportHandler implements
- EventHandler<IncrementalContainerReportFromDatanode> {
-
- private static final Logger LOG =
- LoggerFactory.getLogger(IncrementalContainerReportHandler.class);
+public class IncrementalContainerReportHandler extends
+ AbstractContainerReportHandler
+ implements EventHandler<IncrementalContainerReportFromDatanode> {
- private final PipelineManager pipelineManager;
- private final ContainerManager containerManager;
+ private static final Logger LOG = LoggerFactory.getLogger(
+ IncrementalContainerReportHandler.class);
public IncrementalContainerReportHandler(
- final PipelineManager pipelineManager,
final ContainerManager containerManager) {
- Preconditions.checkNotNull(pipelineManager);
- Preconditions.checkNotNull(containerManager);
- this.pipelineManager = pipelineManager;
- this.containerManager = containerManager;
+ super(containerManager, LOG);
}
@Override
- public void onMessage(
- final IncrementalContainerReportFromDatanode containerReportFromDatanode,
- final EventPublisher publisher) {
+ public void onMessage(final IncrementalContainerReportFromDatanode report,
+ final EventPublisher publisher) {
for (ContainerReplicaProto replicaProto :
- containerReportFromDatanode.getReport().getReportList()) {
+ report.getReport().getReportList()) {
try {
- final DatanodeDetails datanodeDetails = containerReportFromDatanode
- .getDatanodeDetails();
- final ContainerID containerID = ContainerID
- .valueof(replicaProto.getContainerID());
- ReportHandlerHelper.processContainerReplica(containerManager,
- containerID, replicaProto, datanodeDetails, publisher, LOG);
+ processContainerReplica(report.getDatanodeDetails(), replicaProto);
} catch (ContainerNotFoundException e) {
LOG.warn("Container {} not found!", replicaProto.getContainerID());
} catch (IOException e) {
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
index 97c600b..1dce81b 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
@@ -170,6 +170,15 @@ public class ReplicationManager {
}
/**
+ * Returns true if the Replication Monitor Thread is running.
+ *
+ * @return true if running, false otherwise
+ */
+ public boolean isRunning() {
+ return replicationMonitor.isAlive();
+ }
+
+ /**
* Process all the containers immediately.
*/
@VisibleForTesting
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReportHandlerHelper.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReportHandlerHelper.java
deleted file mode 100644
index c566ca9..0000000
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReportHandlerHelper.java
+++ /dev/null
@@ -1,365 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdds.scm.container;
-
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State;
-import org.apache.hadoop.hdds.server.events.EventPublisher;
-import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
-import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
-import org.slf4j.Logger;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Optional;
-import java.util.Set;
-import java.util.UUID;
-import java.util.stream.Collectors;
-
-import static org.apache.hadoop.hdds.scm.events.SCMEvents.DATANODE_COMMAND;
-
-/**
- * Helper functions to handler container reports.
- */
-public final class ReportHandlerHelper {
-
- private ReportHandlerHelper() {}
-
- /**
- * Processes the container replica and updates the container state in SCM.
- * If needed, sends command to datanode to update the replica state.
- *
- * @param containerManager ContainerManager instance
- * @param containerId Id of the container
- * @param replicaProto replica of the container
- * @param datanodeDetails datanode where the replica resides
- * @param publisher event publisher
- * @param logger for logging
- * @throws IOException
- */
- static void processContainerReplica(final ContainerManager containerManager,
- final ContainerID containerId, final ContainerReplicaProto replicaProto,
- final DatanodeDetails datanodeDetails, final EventPublisher publisher,
- final Logger logger) throws IOException {
-
- final ContainerReplica replica = ContainerReplica.newBuilder()
- .setContainerID(containerId)
- .setContainerState(replicaProto.getState())
- .setDatanodeDetails(datanodeDetails)
- .setOriginNodeId(UUID.fromString(replicaProto.getOriginNodeId()))
- .setSequenceId(replicaProto.getBlockCommitSequenceId())
- .build();
-
- // This is an in-memory update.
- containerManager.updateContainerReplica(containerId, replica);
- ReportHandlerHelper.reconcileContainerState(containerManager,
- containerId, publisher, logger);
-
- final ContainerInfo containerInfo = containerManager
- .getContainer(containerId);
- if (containerInfo.getUsedBytes() < replicaProto.getUsed()) {
- containerInfo.setUsedBytes(replicaProto.getUsed());
- }
-
- if (containerInfo.getNumberOfKeys() < replicaProto.getKeyCount()) {
- containerInfo.setNumberOfKeys(replicaProto.getKeyCount());
- }
-
- // Now we have reconciled the container state. If the container state and
- // the replica state doesn't match, then take appropriate action.
- ReportHandlerHelper.sendReplicaCommands(
- datanodeDetails, containerInfo, replica, publisher, logger);
- }
-
-
- /**
- * Reconcile the container state based on the ContainerReplica states.
- * ContainerState is updated after the reconciliation.
- *
- * @param manager ContainerManager
- * @param containerId container id
- * @throws ContainerNotFoundException
- */
- private static void reconcileContainerState(final ContainerManager manager,
- final ContainerID containerId, final EventPublisher publisher,
- final Logger logger) throws IOException {
- // TODO: handle unhealthy replica.
- synchronized (manager.getContainer(containerId)) {
- final ContainerInfo container = manager.getContainer(containerId);
- final Set<ContainerReplica> replicas = manager.getContainerReplicas(
- containerId);
- final LifeCycleState containerState = container.getState();
- switch (containerState) {
- case OPEN:
- /*
- * If the container state is OPEN.
- * None of the replica should be in any other state.
- *
- */
- List<ContainerReplica> invalidReplicas = replicas.stream()
- .filter(replica -> replica.getState() != State.OPEN)
- .collect(Collectors.toList());
- if (!invalidReplicas.isEmpty()) {
- logger.warn("Container {} has invalid replica state." +
- "Invalid Replicas: {}", containerId, invalidReplicas);
- }
- // A container cannot be over replicated when in OPEN state.
- break;
- case CLOSING:
- /*
- * SCM has asked DataNodes to close the container. Now the replicas
- * can be in any of the following states.
- *
- * 1. OPEN
- * 2. CLOSING
- * 3. QUASI_CLOSED
- * 4. CLOSED
- *
- * If all the replica are either in OPEN or CLOSING state, do nothing.
- *
- * If any one of the replica is in QUASI_CLOSED state, move the
- * container to QUASI_CLOSED state.
- *
- * If any one of the replica is in CLOSED state, mark the container as
- * CLOSED. The close has happened via Ratis.
- *
- */
- Optional<ContainerReplica> closedReplica = replicas.stream()
- .filter(replica -> replica.getState() == State.CLOSED)
- .findFirst();
- if (closedReplica.isPresent()) {
- container.updateSequenceId(closedReplica.get().getSequenceId());
- manager.updateContainerState(
- containerId, HddsProtos.LifeCycleEvent.CLOSE);
-
- // TODO: remove container from OPEN pipeline, since the container is
- // closed we can go ahead and remove it from Ratis pipeline.
- } else if (replicas.stream()
- .anyMatch(replica -> replica.getState() == State.QUASI_CLOSED)) {
- manager.updateContainerState(
- containerId, HddsProtos.LifeCycleEvent.QUASI_CLOSE);
- }
- break;
- case QUASI_CLOSED:
- /*
- * The container is in QUASI_CLOSED state, this means that at least
- * one of the replica is in QUASI_CLOSED/CLOSED state.
- * Other replicas can be in any of the following state.
- *
- * 1. OPEN
- * 2. CLOSING
- * 3. QUASI_CLOSED
- * 4. CLOSED
- *
- * If <50% of container replicas are in QUASI_CLOSED state and all
- * the other replica are either in OPEN or CLOSING state, do nothing.
- * We cannot identify the correct replica since we don't have quorum
- * yet.
- *
- * If >50% (quorum) of replicas are in QUASI_CLOSED state and other
- * replicas are either in OPEN or CLOSING state, try to identify
- * the latest container replica using originNodeId and sequenceId.
- * Force close those replica(s) which have the latest sequenceId.
- *
- * If at least one of the replica is in CLOSED state, mark the
- * container as CLOSED. Force close the replicas which matches the
- * sequenceId of the CLOSED replica.
- *
- */
- if (replicas.stream()
- .anyMatch(replica -> replica.getState() == State.CLOSED)) {
- manager.updateContainerState(
- containerId, HddsProtos.LifeCycleEvent.FORCE_CLOSE);
- // TODO: remove container from OPEN pipeline, since the container is
- // closed we can go ahead and remove it from Ratis pipeline.
- } else {
- final int replicationFactor = container
- .getReplicationFactor().getNumber();
- final List<ContainerReplica> quasiClosedReplicas = replicas.stream()
- .filter(replica -> replica.getState() == State.QUASI_CLOSED)
- .collect(Collectors.toList());
- final long uniqueQuasiClosedReplicaCount = quasiClosedReplicas
- .stream()
- .map(ContainerReplica::getOriginDatanodeId)
- .distinct()
- .count();
-
- if (uniqueQuasiClosedReplicaCount > (replicationFactor / 2)) {
- // Quorum of unique replica has been QUASI_CLOSED
- long sequenceId = forceCloseContainerReplicaWithHighestSequenceId(
- container, quasiClosedReplicas, publisher);
- if (sequenceId != -1L) {
- container.updateSequenceId(sequenceId);
- }
- }
- }
- break;
- case CLOSED:
- /*
- * The container is already in closed state. do nothing.
- */
- break;
- case DELETING:
- // Not handled.
- throw new UnsupportedOperationException("Unsupported container state" +
- " 'DELETING'.");
- case DELETED:
- // Not handled.
- throw new UnsupportedOperationException("Unsupported container state" +
- " 'DELETED'.");
- default:
- break;
- }
- }
- }
-
- /**
- * Compares the QUASI_CLOSED replicas of a container and sends close command.
- *
- * @param quasiClosedReplicas list of quasi closed replicas
- * @return the sequenceId of the closed replica.
- */
- private static long forceCloseContainerReplicaWithHighestSequenceId(
- final ContainerInfo container,
- final List<ContainerReplica> quasiClosedReplicas,
- final EventPublisher publisher) {
-
- final long highestSequenceId = quasiClosedReplicas.stream()
- .map(ContainerReplica::getSequenceId)
- .max(Long::compare)
- .orElse(-1L);
-
- if (highestSequenceId != -1L) {
- quasiClosedReplicas.stream()
- .filter(replica -> replica.getSequenceId() == highestSequenceId)
- .forEach(replica -> {
- CloseContainerCommand closeContainerCommand =
- new CloseContainerCommand(container.getContainerID(),
- container.getPipelineID(), true);
- publisher.fireEvent(DATANODE_COMMAND,
- new CommandForDatanode<>(
- replica.getDatanodeDetails().getUuid(),
- closeContainerCommand));
- });
- }
- return highestSequenceId;
- }
-
- /**
- * Based on the container and replica state, send command to datanode if
- * required.
- *
- * @param datanodeDetails datanode where the replica resides
- * @param containerInfo container information
- * @param replica replica information
- * @param publisher queue to publish the datanode command event
- * @param log for logging
- */
- static void sendReplicaCommands(
- final DatanodeDetails datanodeDetails,
- final ContainerInfo containerInfo,
- final ContainerReplica replica,
- final EventPublisher publisher,
- final Logger log) {
- final HddsProtos.LifeCycleState containerState = containerInfo.getState();
- final ContainerReplicaProto.State replicaState = replica.getState();
-
- if(!ReportHandlerHelper.compareState(containerState, replicaState)) {
- if (containerState == HddsProtos.LifeCycleState.OPEN) {
- // When a container state in SCM is OPEN, there is no way a datanode
- // can quasi close/close the container.
- log.warn("Invalid container replica state for container {}" +
- " from datanode {}. Expected state is OPEN.",
- containerInfo.containerID(), datanodeDetails);
- // The replica can go CORRUPT, we have to handle it.
- }
- if (containerState == HddsProtos.LifeCycleState.CLOSING ||
- containerState == HddsProtos.LifeCycleState.QUASI_CLOSED) {
- // Resend close container event for this datanode if the container
- // replica state is OPEN/CLOSING.
- if (replicaState == ContainerReplicaProto.State.OPEN ||
- replicaState == ContainerReplicaProto.State.CLOSING) {
- CloseContainerCommand closeContainerCommand =
- new CloseContainerCommand(containerInfo.getContainerID(),
- containerInfo.getPipelineID());
- publisher.fireEvent(DATANODE_COMMAND,
- new CommandForDatanode<>(
- replica.getDatanodeDetails().getUuid(),
- closeContainerCommand));
- }
- }
- if (containerState == HddsProtos.LifeCycleState.CLOSED) {
- if (replicaState == ContainerReplicaProto.State.OPEN ||
- replicaState == ContainerReplicaProto.State.CLOSING ||
- replicaState == ContainerReplicaProto.State.QUASI_CLOSED) {
- // Send force close container event for this datanode if the container
- // replica state is OPEN/CLOSING/QUASI_CLOSED.
-
- // Close command will be send only if this replica matches the
- // sequence of the container.
- if (containerInfo.getSequenceId() ==
- replica.getSequenceId()) {
- CloseContainerCommand closeContainerCommand =
- new CloseContainerCommand(containerInfo.getContainerID(),
- containerInfo.getPipelineID(), true);
- publisher.fireEvent(DATANODE_COMMAND,
- new CommandForDatanode<>(
- replica.getDatanodeDetails().getUuid(),
- closeContainerCommand));
- }
- // TODO: delete the replica if the BCSID doesn't match.
- }
- }
- }
-
- }
-
- /**
- * Compares the container and replica state.
- *
- * @param containerState container state
- * @param replicaState replica state
- * @return true if the states are same, else false
- */
- private static boolean compareState(final LifeCycleState containerState,
- final State replicaState) {
- // TODO: handle unhealthy replica.
- switch (containerState) {
- case OPEN:
- return replicaState == State.OPEN;
- case CLOSING:
- return replicaState == State.CLOSING;
- case QUASI_CLOSED:
- return replicaState == State.QUASI_CLOSED;
- case CLOSED:
- return replicaState == State.CLOSED;
- case DELETING:
- return false;
- case DELETED:
- return false;
- default:
- return false;
- }
- }
-
-}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java
index fe52669..1fa8395 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java
@@ -295,18 +295,20 @@ public class SCMContainerManager implements ContainerManager {
// Should we return the updated ContainerInfo instead of LifeCycleState?
lock.lock();
try {
- ContainerInfo container = containerStateManager.getContainer(containerID);
- ContainerInfo updatedContainer =
- updateContainerStateInternal(containerID, event);
- if (updatedContainer.getState() != LifeCycleState.OPEN
- && container.getState() == LifeCycleState.OPEN) {
+ final ContainerInfo container = containerStateManager
+ .getContainer(containerID);
+ final LifeCycleState oldState = container.getState();
+ containerStateManager.updateContainerState(containerID, event);
+ final LifeCycleState newState = container.getState();
+
+ if (oldState == LifeCycleState.OPEN && newState != LifeCycleState.OPEN) {
pipelineManager
- .removeContainerFromPipeline(updatedContainer.getPipelineID(),
+ .removeContainerFromPipeline(container.getPipelineID(),
containerID);
}
final byte[] dbKey = Longs.toByteArray(containerID.getId());
- containerStore.put(dbKey, updatedContainer.getProtobuf().toByteArray());
- return updatedContainer.getState();
+ containerStore.put(dbKey, container.getProtobuf().toByteArray());
+ return newState;
} catch (ContainerNotFoundException cnfe) {
throw new SCMException(
"Failed to update container state"
@@ -318,11 +320,6 @@ public class SCMContainerManager implements ContainerManager {
}
}
- private ContainerInfo updateContainerStateInternal(ContainerID containerID,
- HddsProtos.LifeCycleEvent event) throws IOException {
- return containerStateManager.updateContainerState(containerID, event);
- }
-
/**
* Update deleteTransactionId according to deleteTransactionMap.
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
index 2aba724..7411055 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
@@ -296,8 +296,7 @@ public class ContainerStateMap {
checkIfContainerExist(containerID);
final ContainerInfo currentInfo = containerMap.get(containerID);
try {
- final ContainerInfo newInfo = new ContainerInfo(currentInfo);
- newInfo.setState(newState);
+ currentInfo.setState(newState);
// We are updating two places before this update is done, these can
// fail independently, since the code needs to handle it.
@@ -309,13 +308,12 @@ public class ContainerStateMap {
// roll back the earlier change we did. If the rollback fails, we can
// be in an inconsistent state,
- containerMap.put(containerID, newInfo);
lifeCycleStateMap.update(currentState, newState, containerID);
LOG.trace("Updated the container {} to new state. Old = {}, new = " +
"{}", containerID, currentState, newState);
// Just flush both old and new data sets from the result cache.
- flushCache(currentInfo, newInfo);
+ flushCache(currentInfo);
} catch (SCMException ex) {
LOG.error("Unable to update the container state. {}", ex);
// we need to revert the change in this attribute since we are not
@@ -324,7 +322,7 @@ public class ContainerStateMap {
"old state. Old = {}, Attempted state = {}", currentState,
newState);
- containerMap.put(containerID, currentInfo);
+ currentInfo.setState(currentState);
// if this line throws, the state map can be in an inconsistent
// state, since we will have modified the attribute by the
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMConfigurator.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMConfigurator.java
index 9c955033..a6b0704 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMConfigurator.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMConfigurator.java
@@ -23,7 +23,7 @@ package org.apache.hadoop.hdds.scm.server;
import org.apache.hadoop.hdds.scm.block.BlockManager;
import org.apache.hadoop.hdds.scm.chillmode.SCMChillModeManager;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
-import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
+import org.apache.hadoop.hdds.scm.container.ReplicationManager;
import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index b51b537..79565f0 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -55,8 +55,7 @@ import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacem
import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementCapacity;
import org.apache.hadoop.hdds.scm.container.placement.metrics.ContainerStat;
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMMetrics;
-import org.apache.hadoop.hdds.scm.container.replication.ReplicationActivityStatus;
-import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
+import org.apache.hadoop.hdds.scm.container.ReplicationManager;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes;
@@ -99,7 +98,6 @@ import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.JvmPauseMonitor;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.utils.HddsVersionInfo;
-import org.apache.hadoop.utils.Scheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -176,7 +174,6 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
private SCMMetadataStore scmMetadataStore;
private final EventQueue eventQueue;
- private final Scheduler commonScheduler;
/*
* HTTP endpoint for JMX access.
*/
@@ -199,7 +196,6 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
private final LeaseManager<Long> commandWatcherLeaseManager;
- private final ReplicationActivityStatus replicationStatus;
private SCMChillModeManager scmChillModeManager;
private CertificateServer certificateServer;
@@ -287,8 +283,6 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
commandWatcherLeaseManager = new LeaseManager<>("CommandWatcher",
watcherTimeout);
initalizeSystemManagers(conf, configurator);
- commonScheduler = new Scheduler("SCMCommonScheduler", false, 1);
- replicationStatus = new ReplicationActivityStatus(commonScheduler);
CloseContainerEventHandler closeContainerHandler =
new CloseContainerEventHandler(pipelineManager, containerManager);
@@ -311,12 +305,10 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
new PendingDeleteHandler(scmBlockManager.getSCMBlockDeletingService());
ContainerReportHandler containerReportHandler =
- new ContainerReportHandler(scmNodeManager, pipelineManager,
- containerManager, replicationStatus);
+ new ContainerReportHandler(scmNodeManager, containerManager);
IncrementalContainerReportHandler incrementalContainerReportHandler =
- new IncrementalContainerReportHandler(
- pipelineManager, containerManager);
+ new IncrementalContainerReportHandler(containerManager);
PipelineActionHandler pipelineActionHandler =
new PipelineActionHandler(pipelineManager, conf);
@@ -343,7 +335,7 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
httpServer = new StorageContainerManagerHttpServer(conf);
chillModeHandler = new ChillModeHandler(configuration,
- clientProtocolServer, scmBlockManager, replicationStatus);
+ clientProtocolServer, scmBlockManager, replicationManager);
eventQueue.addHandler(SCMEvents.DATANODE_COMMAND, scmNodeManager);
eventQueue.addHandler(SCMEvents.RETRIABLE_DATANODE_COMMAND, scmNodeManager);
@@ -422,8 +414,8 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
if (configurator.getReplicationManager() != null) {
replicationManager = configurator.getReplicationManager();
} else {
- replicationManager = new ReplicationManager(containerPlacementPolicy,
- containerManager, eventQueue, commandWatcherLeaseManager);
+ replicationManager = new ReplicationManager(conf,
+ containerManager, containerPlacementPolicy, eventQueue);
}
if(configurator.getScmChillModeManager() != null) {
scmChillModeManager = configurator.getScmChillModeManager();
@@ -917,8 +909,6 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
httpServer.start();
scmBlockManager.start();
- replicationStatus.start();
- replicationManager.start();
// Start jvm monitor
jvmPauseMonitor = new JvmPauseMonitor();
@@ -934,14 +924,6 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
public void stop() {
try {
- LOG.info("Stopping Replication Activity Status tracker.");
- replicationStatus.close();
- } catch (Exception ex) {
- LOG.error("Replication Activity Status tracker stop failed.", ex);
- }
-
-
- try {
LOG.info("Stopping Replication Manager Service.");
replicationManager.stop();
} catch (Exception ex) {
@@ -1017,13 +999,6 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
LOG.error("SCM Event Queue stop failed", ex);
}
- try {
- LOG.info("Stopping SCM Common Scheduler.");
- commonScheduler.close();
- } catch (Exception ex) {
- LOG.error("SCM Common Scheduler close failed {}", ex);
- }
-
if (jvmPauseMonitor != null) {
jvmPauseMonitor.stop();
}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
index 19c35fd..d61924a 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
@@ -533,6 +533,7 @@ public final class TestUtils {
.setReplicationType(HddsProtos.ReplicationType.RATIS)
.setReplicationFactor(HddsProtos.ReplicationFactor.THREE)
.setState(state)
+ .setSequenceId(10000L)
.setOwner("TEST")
.build();
}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/chillmode/TestChillModeHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/chillmode/TestChillModeHandler.java
index efd69fd..7c9f98e 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/chillmode/TestChillModeHandler.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/chillmode/TestChillModeHandler.java
@@ -22,16 +22,20 @@ import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.block.BlockManager;
import org.apache.hadoop.hdds.scm.block.BlockManagerImpl;
-import org.apache.hadoop.hdds.scm.container.replication.ReplicationActivityStatus;
+import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.container.ReplicationManager;
+import org.apache.hadoop.hdds.scm.container.placement.algorithms
+ .ContainerPlacementPolicy;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.server.SCMClientProtocolServer;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.test.GenericTestUtils;
-import org.apache.hadoop.utils.Scheduler;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
+import java.util.HashSet;
+
/**
* Tests ChillModeHandler behavior.
*/
@@ -40,7 +44,7 @@ public class TestChillModeHandler {
private OzoneConfiguration configuration;
private SCMClientProtocolServer scmClientProtocolServer;
- private ReplicationActivityStatus replicationActivityStatus;
+ private ReplicationManager replicationManager;
private BlockManager blockManager;
private ChillModeHandler chillModeHandler;
private EventQueue eventQueue;
@@ -54,15 +58,19 @@ public class TestChillModeHandler {
"3s");
scmClientProtocolServer =
Mockito.mock(SCMClientProtocolServer.class);
- replicationActivityStatus = new ReplicationActivityStatus(
- new Scheduler("SCMCommonScheduler", false, 1));
+ eventQueue = new EventQueue();
+ final ContainerManager containerManager =
+ Mockito.mock(ContainerManager.class);
+ Mockito.when(containerManager.getContainerIDs())
+ .thenReturn(new HashSet<>());
+ replicationManager = new ReplicationManager(configuration,
+ containerManager, Mockito.mock(ContainerPlacementPolicy.class),
+ eventQueue);
blockManager = Mockito.mock(BlockManagerImpl.class);
chillModeHandler =
new ChillModeHandler(configuration, scmClientProtocolServer,
- blockManager, replicationActivityStatus);
+ blockManager, replicationManager);
-
- eventQueue = new EventQueue();
eventQueue.addHandler(SCMEvents.CHILL_MODE_STATUS, chillModeHandler);
chillModeStatus = new SCMChillModeManager.ChillModeStatus(false);
@@ -82,7 +90,7 @@ public class TestChillModeHandler {
Assert.assertFalse(scmClientProtocolServer.getChillModeStatus());
Assert.assertFalse(((BlockManagerImpl) blockManager).isScmInChillMode());
GenericTestUtils.waitFor(() ->
- replicationActivityStatus.isReplicationEnabled(), 1000, 5000);
+ replicationManager.isRunning(), 1000, 5000);
}
@@ -99,6 +107,6 @@ public class TestChillModeHandler {
Assert.assertFalse(scmClientProtocolServer.getChillModeStatus());
Assert.assertFalse(((BlockManagerImpl) blockManager).isScmInChillMode());
GenericTestUtils.waitFor(() ->
- replicationActivityStatus.isReplicationEnabled(), 1000, 5000);
+ replicationManager.isRunning(), 1000, 5000);
}
}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java
index 0b7cae4..41585bc 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java
@@ -16,124 +16,146 @@
*/
package org.apache.hadoop.hdds.scm.container;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
-import org.apache.hadoop.hdds.scm.container.replication
- .ReplicationActivityStatus;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.scm.server
.SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode;
import org.apache.hadoop.hdds.server.events.EventPublisher;
-import org.apache.hadoop.utils.Scheduler;
-import org.junit.AfterClass;
+import org.junit.After;
import org.junit.Assert;
-import org.junit.BeforeClass;
+import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
-import org.mockito.stubbing.Answer;
import java.io.IOException;
-import java.util.Collections;
import java.util.Iterator;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import static org.apache.hadoop.hdds.scm.TestUtils
- .getReplicas;
-import static org.apache.hadoop.hdds.scm.TestUtils
- .getContainer;
-import static org.apache.hadoop.hdds.scm.container
- .TestContainerReportHelper.addContainerToContainerManager;
-import static org.apache.hadoop.hdds.scm.container
- .TestContainerReportHelper.mockUpdateContainerReplica;
-import static org.apache.hadoop.hdds.scm.container
- .TestContainerReportHelper.mockUpdateContainerState;
+import static org.apache.hadoop.hdds.scm.TestUtils.getReplicas;
+import static org.apache.hadoop.hdds.scm.TestUtils.getContainer;
/**
* Test the behaviour of the ContainerReportHandler.
*/
public class TestContainerReportHandler {
- private static Scheduler scheduler;
+ private NodeManager nodeManager;
+ private ContainerManager containerManager;
+ private ContainerStateManager containerStateManager;
+ private EventPublisher publisher;
+
+ @Before
+ public void setup() throws IOException {
+ final Configuration conf = new OzoneConfiguration();
+ this.nodeManager = new MockNodeManager(true, 10);
+ this.containerManager = Mockito.mock(ContainerManager.class);
+ this.containerStateManager = new ContainerStateManager(conf);
+ this.publisher = Mockito.mock(EventPublisher.class);
+
+
+ Mockito.when(containerManager.getContainer(Mockito.any(ContainerID.class)))
+ .thenAnswer(invocation -> containerStateManager
+ .getContainer((ContainerID)invocation.getArguments()[0]));
+
+ Mockito.when(containerManager.getContainerReplicas(
+ Mockito.any(ContainerID.class)))
+ .thenAnswer(invocation -> containerStateManager
+ .getContainerReplicas((ContainerID)invocation.getArguments()[0]));
+
+ Mockito.doAnswer(invocation -> {
+ containerStateManager
+ .updateContainerState((ContainerID)invocation.getArguments()[0],
+ (HddsProtos.LifeCycleEvent)invocation.getArguments()[1]);
+ return null;
+ }).when(containerManager).updateContainerState(
+ Mockito.any(ContainerID.class),
+ Mockito.any(HddsProtos.LifeCycleEvent.class));
+
+ Mockito.doAnswer(invocation -> {
+ containerStateManager.updateContainerReplica(
+ (ContainerID) invocation.getArguments()[0],
+ (ContainerReplica) invocation.getArguments()[1]);
+ return null;
+ }).when(containerManager).updateContainerReplica(
+ Mockito.any(ContainerID.class), Mockito.any(ContainerReplica.class));
+
+ Mockito.doAnswer(invocation -> {
+ containerStateManager.removeContainerReplica(
+ (ContainerID) invocation.getArguments()[0],
+ (ContainerReplica) invocation.getArguments()[1]);
+ return null;
+ }).when(containerManager).removeContainerReplica(
+ Mockito.any(ContainerID.class), Mockito.any(ContainerReplica.class));
- @BeforeClass
- public static void setup() {
- scheduler = new Scheduler("SCMCommonScheduler", false, 1);
}
- @AfterClass
- public static void tearDown() {
- scheduler.close();
+ @After
+ public void tearDown() throws IOException {
+ containerStateManager.close();
}
@Test
public void testUnderReplicatedContainer()
- throws NodeNotFoundException, ContainerNotFoundException,
- ContainerReplicaNotFoundException {
-
- final NodeManager nodeManager = new MockNodeManager(true, 10);
- final ContainerManager containerManager = Mockito.mock(
- ContainerManager.class);
- final ReplicationActivityStatus replicationActivityStatus =
- new ReplicationActivityStatus(scheduler);
- replicationActivityStatus.enableReplication();
+ throws NodeNotFoundException, ContainerNotFoundException, SCMException {
- final PipelineManager pipelineManager = Mockito.mock(PipelineManager.class);
final ContainerReportHandler reportHandler = new ContainerReportHandler(
- nodeManager, pipelineManager, containerManager,
- replicationActivityStatus);
+ nodeManager, containerManager);
final Iterator<DatanodeDetails> nodeIterator = nodeManager.getNodes(
NodeState.HEALTHY).iterator();
final DatanodeDetails datanodeOne = nodeIterator.next();
final DatanodeDetails datanodeTwo = nodeIterator.next();
final DatanodeDetails datanodeThree = nodeIterator.next();
+
final ContainerInfo containerOne = getContainer(LifeCycleState.CLOSED);
final ContainerInfo containerTwo = getContainer(LifeCycleState.CLOSED);
final Set<ContainerID> containerIDSet = Stream.of(
containerOne.containerID(), containerTwo.containerID())
.collect(Collectors.toSet());
- final Set<ContainerReplica> containerOneReplicas = getReplicas(
- containerOne.containerID(), ContainerReplicaProto.State.CLOSED,
- datanodeOne, datanodeTwo, datanodeThree);
- final Set<ContainerReplica> containerTwoReplicas = getReplicas(
- containerTwo.containerID(), ContainerReplicaProto.State.CLOSED,
- datanodeOne, datanodeTwo, datanodeThree);
nodeManager.setContainers(datanodeOne, containerIDSet);
nodeManager.setContainers(datanodeTwo, containerIDSet);
nodeManager.setContainers(datanodeThree, containerIDSet);
- addContainerToContainerManager(
- containerManager, containerOne, containerOneReplicas);
- addContainerToContainerManager(
- containerManager, containerTwo, containerTwoReplicas);
+ containerStateManager.loadContainer(containerOne);
+ containerStateManager.loadContainer(containerTwo);
- Mockito.doAnswer((Answer<Void>) invocation -> {
- Object[] args = invocation.getArguments();
- if (args[0].equals(containerOne.containerID())) {
- ContainerReplica replica = (ContainerReplica) args[1];
- containerOneReplicas.remove(replica);
- }
- return null;
- }).when(containerManager).removeContainerReplica(
- Mockito.any(ContainerID.class), Mockito.any(ContainerReplica.class));
+ getReplicas(containerOne.containerID(),
+ ContainerReplicaProto.State.CLOSED,
+ datanodeOne, datanodeTwo, datanodeThree)
+ .forEach(r -> {
+ try {
+ containerStateManager.updateContainerReplica(
+ containerOne.containerID(), r);
+ } catch (ContainerNotFoundException ignored) {
+
+ }
+ });
+
+ getReplicas(containerTwo.containerID(),
+ ContainerReplicaProto.State.CLOSED,
+ datanodeOne, datanodeTwo, datanodeThree)
+ .forEach(r -> {
+ try {
+ containerStateManager.updateContainerReplica(
+ containerTwo.containerID(), r);
+ } catch (ContainerNotFoundException ignored) {
+ }
+ });
- Mockito.when(
- containerManager.getContainerReplicas(containerOne.containerID()))
- .thenReturn(containerOneReplicas);
- Mockito.when(
- containerManager.getContainerReplicas(containerTwo.containerID()))
- .thenReturn(containerTwoReplicas);
// SCM expects both containerOne and containerTwo to be in all the three
// datanodes datanodeOne, datanodeTwo and datanodeThree
@@ -145,70 +167,65 @@ public class TestContainerReportHandler {
final ContainerReportsProto containerReport = getContainerReportsProto(
containerTwo.containerID(), ContainerReplicaProto.State.CLOSED,
datanodeOne.getUuidString());
- final EventPublisher publisher = Mockito.mock(EventPublisher.class);
final ContainerReportFromDatanode containerReportFromDatanode =
new ContainerReportFromDatanode(datanodeOne, containerReport);
reportHandler.onMessage(containerReportFromDatanode, publisher);
+ Assert.assertEquals(2, containerManager.getContainerReplicas(
+ containerOne.containerID()).size());
- // Now we should get a replication request for containerOne
- Mockito.verify(publisher, Mockito.times(1))
- .fireEvent(Mockito.any(), Mockito.any());
-
- // TODO: verify whether are actually getting a replication request event
- // for containerOne
}
@Test
public void testOverReplicatedContainer() throws NodeNotFoundException,
- ContainerNotFoundException {
-
- final NodeManager nodeManager = new MockNodeManager(true, 10);
- final ContainerManager containerManager = Mockito.mock(
- ContainerManager.class);
- final ReplicationActivityStatus replicationActivityStatus =
- new ReplicationActivityStatus(scheduler);
- replicationActivityStatus.enableReplication();
+ SCMException, ContainerNotFoundException {
- final PipelineManager pipelineManager = Mockito.mock(PipelineManager.class);
final ContainerReportHandler reportHandler = new ContainerReportHandler(
- nodeManager, pipelineManager, containerManager,
- replicationActivityStatus);
+ nodeManager, containerManager);
+
final Iterator<DatanodeDetails> nodeIterator = nodeManager.getNodes(
NodeState.HEALTHY).iterator();
final DatanodeDetails datanodeOne = nodeIterator.next();
final DatanodeDetails datanodeTwo = nodeIterator.next();
final DatanodeDetails datanodeThree = nodeIterator.next();
final DatanodeDetails datanodeFour = nodeIterator.next();
+
final ContainerInfo containerOne = getContainer(LifeCycleState.CLOSED);
final ContainerInfo containerTwo = getContainer(LifeCycleState.CLOSED);
+
final Set<ContainerID> containerIDSet = Stream.of(
containerOne.containerID(), containerTwo.containerID())
.collect(Collectors.toSet());
- final Set<ContainerReplica> containerOneReplicas = getReplicas(
- containerOne.containerID(), ContainerReplicaProto.State.CLOSED,
- datanodeOne, datanodeTwo, datanodeThree);
- final Set<ContainerReplica> containerTwoReplicas = getReplicas(
- containerTwo.containerID(), ContainerReplicaProto.State.CLOSED,
- datanodeOne, datanodeTwo, datanodeThree);
nodeManager.setContainers(datanodeOne, containerIDSet);
nodeManager.setContainers(datanodeTwo, containerIDSet);
nodeManager.setContainers(datanodeThree, containerIDSet);
- addContainerToContainerManager(
- containerManager, containerOne, containerOneReplicas);
- addContainerToContainerManager(
- containerManager, containerTwo, containerTwoReplicas);
+ containerStateManager.loadContainer(containerOne);
+ containerStateManager.loadContainer(containerTwo);
- Mockito.doAnswer((Answer<Void>) invocation -> {
- Object[] args = invocation.getArguments();
- if (args[0].equals(containerOne.containerID())) {
- containerOneReplicas.add((ContainerReplica) args[1]);
- }
- return null;
- }).when(containerManager).updateContainerReplica(
- Mockito.any(ContainerID.class), Mockito.any(ContainerReplica.class));
+ getReplicas(containerOne.containerID(),
+ ContainerReplicaProto.State.CLOSED,
+ datanodeOne, datanodeTwo, datanodeThree)
+ .forEach(r -> {
+ try {
+ containerStateManager.updateContainerReplica(
+ containerOne.containerID(), r);
+ } catch (ContainerNotFoundException ignored) {
+ }
+ });
+
+ getReplicas(containerTwo.containerID(),
+ ContainerReplicaProto.State.CLOSED,
+ datanodeOne, datanodeTwo, datanodeThree)
+ .forEach(r -> {
+ try {
+ containerStateManager.updateContainerReplica(
+ containerTwo.containerID(), r);
+ } catch (ContainerNotFoundException ignored) {
+
+ }
+ });
// SCM expects both containerOne and containerTwo to be in all the three
@@ -220,114 +237,15 @@ public class TestContainerReportHandler {
final ContainerReportsProto containerReport = getContainerReportsProto(
containerOne.containerID(), ContainerReplicaProto.State.CLOSED,
- datanodeOne.getUuidString());
- final EventPublisher publisher = Mockito.mock(EventPublisher.class);
+ datanodeFour.getUuidString());
final ContainerReportFromDatanode containerReportFromDatanode =
new ContainerReportFromDatanode(datanodeFour, containerReport);
reportHandler.onMessage(containerReportFromDatanode, publisher);
- Mockito.verify(publisher, Mockito.times(1))
- .fireEvent(Mockito.any(), Mockito.any());
- // TODO: verify whether are actually getting a replication request event
- // for containerOne
+ Assert.assertEquals(4, containerManager.getContainerReplicas(
+ containerOne.containerID()).size());
}
- @Test
- public void testOpenToClosing()
- throws NodeNotFoundException, ContainerNotFoundException {
- /*
- * The container is in CLOSING state and all the replicas are either in
- * OPEN or CLOSING state.
- *
- * The datanode reports that the replica is still in OPEN state.
- *
- * In this case SCM should trigger close container event to the datanode.
- */
-
- final NodeManager nodeManager = new MockNodeManager(true, 10);
- final ContainerManager containerManager = Mockito.mock(
- ContainerManager.class);
- final ReplicationActivityStatus replicationActivityStatus =
- new ReplicationActivityStatus(scheduler);
- replicationActivityStatus.enableReplication();
-
- final PipelineManager pipelineManager = Mockito.mock(PipelineManager.class);
- final ContainerReportHandler reportHandler = new ContainerReportHandler(
- nodeManager, pipelineManager, containerManager,
- replicationActivityStatus);
- final Iterator<DatanodeDetails> nodeIterator = nodeManager.getNodes(
- NodeState.HEALTHY).iterator();
- final DatanodeDetails datanodeOne = nodeIterator.next();
- final DatanodeDetails datanodeTwo = nodeIterator.next();
- final DatanodeDetails datanodeThree = nodeIterator.next();
- final ContainerInfo containerOne = getContainer(LifeCycleState.CLOSING);
- final ContainerInfo containerTwo = getContainer(LifeCycleState.CLOSED);
- final Set<ContainerID> containerIDSet = Stream.of(
- containerOne.containerID(), containerTwo.containerID())
- .collect(Collectors.toSet());
- final Set<ContainerReplica> containerOneReplicas = getReplicas(
- containerOne.containerID(), ContainerReplicaProto.State.OPEN,
- datanodeOne);
-
- containerOneReplicas.addAll(getReplicas(
- containerOne.containerID(), ContainerReplicaProto.State.CLOSING,
- datanodeTwo, datanodeThree));
-
- final Set<ContainerReplica> containerTwoReplicas = getReplicas(
- containerTwo.containerID(), ContainerReplicaProto.State.CLOSED,
- datanodeOne, datanodeTwo, datanodeThree);
-
- nodeManager.setContainers(datanodeOne, containerIDSet);
- nodeManager.setContainers(datanodeTwo, containerIDSet);
- nodeManager.setContainers(datanodeThree, containerIDSet);
-
- addContainerToContainerManager(
- containerManager, containerOne, containerOneReplicas);
- addContainerToContainerManager(
- containerManager, containerTwo, containerTwoReplicas);
- mockUpdateContainerReplica(
- containerManager, containerOne, containerOneReplicas);
-
- // Replica in datanodeOne of containerOne is in OPEN state.
- final ContainerReportsProto containerReport = getContainerReportsProto(
- containerOne.containerID(), ContainerReplicaProto.State.OPEN,
- datanodeOne.getUuidString());
- final EventPublisher publisher = Mockito.mock(EventPublisher.class);
- final ContainerReportFromDatanode containerReportFromDatanode =
- new ContainerReportFromDatanode(datanodeOne, containerReport);
- reportHandler.onMessage(containerReportFromDatanode, publisher);
-
- // Now we should get close container event for containerOne on datanodeOne
- Mockito.verify(publisher, Mockito.times(1))
- .fireEvent(Mockito.any(), Mockito.any());
-
- // TODO: verify whether are actually getting a close container
- // datanode command for containerOne/datanodeOne
-
- /*
- * The container is in CLOSING state and all the replicas are either in
- * OPEN or CLOSING state.
- *
- * The datanode reports that the replica is in CLOSING state.
- *
- * In this case SCM should trigger close container event to the datanode.
- */
-
- // Replica in datanodeOne of containerOne is in OPEN state.
- final ContainerReportsProto containerReportTwo = getContainerReportsProto(
- containerOne.containerID(), ContainerReplicaProto.State.OPEN,
- datanodeOne.getUuidString());
- final ContainerReportFromDatanode containerReportTwoFromDatanode =
- new ContainerReportFromDatanode(datanodeOne, containerReportTwo);
- reportHandler.onMessage(containerReportTwoFromDatanode, publisher);
-
- // Now we should get close container event for containerOne on datanodeOne
- Mockito.verify(publisher, Mockito.times(2))
- .fireEvent(Mockito.any(), Mockito.any());
-
- // TODO: verify whether are actually getting a close container
- // datanode command for containerOne/datanodeOne
- }
@Test
public void testClosingToClosed() throws NodeNotFoundException, IOException {
@@ -339,27 +257,23 @@ public class TestContainerReportHandler {
*
* In this case SCM should mark the container as CLOSED.
*/
- final NodeManager nodeManager = new MockNodeManager(true, 10);
- final ContainerManager containerManager = Mockito.mock(
- ContainerManager.class);
- final ReplicationActivityStatus replicationActivityStatus =
- new ReplicationActivityStatus(scheduler);
- replicationActivityStatus.enableReplication();
-
- final PipelineManager pipelineManager = Mockito.mock(PipelineManager.class);
+
final ContainerReportHandler reportHandler = new ContainerReportHandler(
- nodeManager, pipelineManager, containerManager,
- replicationActivityStatus);
+ nodeManager, containerManager);
+
final Iterator<DatanodeDetails> nodeIterator = nodeManager.getNodes(
NodeState.HEALTHY).iterator();
final DatanodeDetails datanodeOne = nodeIterator.next();
final DatanodeDetails datanodeTwo = nodeIterator.next();
final DatanodeDetails datanodeThree = nodeIterator.next();
+
final ContainerInfo containerOne = getContainer(LifeCycleState.CLOSING);
final ContainerInfo containerTwo = getContainer(LifeCycleState.CLOSED);
+
final Set<ContainerID> containerIDSet = Stream.of(
containerOne.containerID(), containerTwo.containerID())
.collect(Collectors.toSet());
+
final Set<ContainerReplica> containerOneReplicas = getReplicas(
containerOne.containerID(),
ContainerReplicaProto.State.CLOSING,
@@ -379,25 +293,36 @@ public class TestContainerReportHandler {
nodeManager.setContainers(datanodeTwo, containerIDSet);
nodeManager.setContainers(datanodeThree, containerIDSet);
- addContainerToContainerManager(
- containerManager, containerOne, containerOneReplicas);
- addContainerToContainerManager(
- containerManager, containerTwo, containerTwoReplicas);
- mockUpdateContainerReplica(
- containerManager, containerOne, containerOneReplicas);
- mockUpdateContainerState(containerManager, containerOne,
- LifeCycleEvent.CLOSE, LifeCycleState.CLOSED);
+ containerStateManager.loadContainer(containerOne);
+ containerStateManager.loadContainer(containerTwo);
+
+ containerOneReplicas.forEach(r -> {
+ try {
+ containerStateManager.updateContainerReplica(
+ containerTwo.containerID(), r);
+ } catch (ContainerNotFoundException ignored) {
+
+ }
+ });
+
+ containerTwoReplicas.forEach(r -> {
+ try {
+ containerStateManager.updateContainerReplica(
+ containerTwo.containerID(), r);
+ } catch (ContainerNotFoundException ignored) {
+
+ }
+ });
+
final ContainerReportsProto containerReport = getContainerReportsProto(
containerOne.containerID(), ContainerReplicaProto.State.CLOSED,
datanodeOne.getUuidString());
- final EventPublisher publisher = Mockito.mock(EventPublisher.class);
final ContainerReportFromDatanode containerReportFromDatanode =
new ContainerReportFromDatanode(datanodeOne, containerReport);
reportHandler.onMessage(containerReportFromDatanode, publisher);
- Assert.assertEquals(
- LifeCycleState.CLOSED, containerOne.getState());
+ Assert.assertEquals(LifeCycleState.CLOSED, containerOne.getState());
}
@Test
@@ -411,29 +336,23 @@ public class TestContainerReportHandler {
*
* In this case SCM should move the container to QUASI_CLOSED.
*/
- final NodeManager nodeManager = new MockNodeManager(true, 10);
- final ContainerManager containerManager = Mockito.mock(
- ContainerManager.class);
- final ReplicationActivityStatus replicationActivityStatus =
- new ReplicationActivityStatus(scheduler);
- replicationActivityStatus.enableReplication();
-
- final PipelineManager pipelineManager = Mockito.mock(PipelineManager.class);
+
final ContainerReportHandler reportHandler = new ContainerReportHandler(
- nodeManager, pipelineManager, containerManager,
- replicationActivityStatus);
+ nodeManager, containerManager);
+
final Iterator<DatanodeDetails> nodeIterator = nodeManager.getNodes(
NodeState.HEALTHY).iterator();
final DatanodeDetails datanodeOne = nodeIterator.next();
final DatanodeDetails datanodeTwo = nodeIterator.next();
final DatanodeDetails datanodeThree = nodeIterator.next();
- final ContainerInfo containerOne =
- getContainer(LifeCycleState.CLOSING);
- final ContainerInfo containerTwo =
- getContainer(LifeCycleState.CLOSED);
+
+ final ContainerInfo containerOne = getContainer(LifeCycleState.CLOSING);
+ final ContainerInfo containerTwo = getContainer(LifeCycleState.CLOSED);
+
final Set<ContainerID> containerIDSet = Stream.of(
containerOne.containerID(), containerTwo.containerID())
.collect(Collectors.toSet());
+
final Set<ContainerReplica> containerOneReplicas = getReplicas(
containerOne.containerID(),
ContainerReplicaProto.State.CLOSING,
@@ -451,60 +370,65 @@ public class TestContainerReportHandler {
nodeManager.setContainers(datanodeTwo, containerIDSet);
nodeManager.setContainers(datanodeThree, containerIDSet);
- addContainerToContainerManager(
- containerManager, containerOne, containerOneReplicas);
- addContainerToContainerManager(
- containerManager, containerTwo, containerTwoReplicas);
- mockUpdateContainerReplica(
- containerManager, containerOne, containerOneReplicas);
- mockUpdateContainerState(containerManager, containerOne,
- LifeCycleEvent.QUASI_CLOSE, LifeCycleState.QUASI_CLOSED);
+ containerStateManager.loadContainer(containerOne);
+ containerStateManager.loadContainer(containerTwo);
+
+ containerOneReplicas.forEach(r -> {
+ try {
+ containerStateManager.updateContainerReplica(
+ containerTwo.containerID(), r);
+ } catch (ContainerNotFoundException ignored) {
+
+ }
+ });
+
+ containerTwoReplicas.forEach(r -> {
+ try {
+ containerStateManager.updateContainerReplica(
+ containerTwo.containerID(), r);
+ } catch (ContainerNotFoundException ignored) {
+
+ }
+ });
+
final ContainerReportsProto containerReport = getContainerReportsProto(
containerOne.containerID(), ContainerReplicaProto.State.QUASI_CLOSED,
datanodeOne.getUuidString());
- final EventPublisher publisher = Mockito.mock(EventPublisher.class);
final ContainerReportFromDatanode containerReportFromDatanode =
new ContainerReportFromDatanode(datanodeOne, containerReport);
reportHandler.onMessage(containerReportFromDatanode, publisher);
- Assert.assertEquals(
- LifeCycleState.QUASI_CLOSED, containerOne.getState());
+ Assert.assertEquals(LifeCycleState.QUASI_CLOSED, containerOne.getState());
}
@Test
- public void testQuasiClosedWithDifferentOriginNodeReplica()
+ public void testQuasiClosedToClosed()
throws NodeNotFoundException, IOException {
/*
* The container is in QUASI_CLOSED state.
* - One of the replica is in QUASI_CLOSED state
* - The other two replica are in OPEN/CLOSING state
*
- * The datanode reports the second replica is now QUASI_CLOSED.
+ * The datanode reports the second replica is now CLOSED.
*
- * In this case SCM should CLOSE the container with highest BCSID and
- * send force close command to the datanode.
+ * In this case SCM should CLOSE the container.
*/
- final NodeManager nodeManager = new MockNodeManager(true, 10);
- final ContainerManager containerManager = Mockito.mock(
- ContainerManager.class);
- final ReplicationActivityStatus replicationActivityStatus =
- new ReplicationActivityStatus(scheduler);
- replicationActivityStatus.enableReplication();
-
- final PipelineManager pipelineManager = Mockito.mock(PipelineManager.class);
+
final ContainerReportHandler reportHandler = new ContainerReportHandler(
- nodeManager, pipelineManager, containerManager,
- replicationActivityStatus);
+ nodeManager, containerManager);
final Iterator<DatanodeDetails> nodeIterator = nodeManager.getNodes(
NodeState.HEALTHY).iterator();
+
final DatanodeDetails datanodeOne = nodeIterator.next();
final DatanodeDetails datanodeTwo = nodeIterator.next();
final DatanodeDetails datanodeThree = nodeIterator.next();
+
final ContainerInfo containerOne =
getContainer(LifeCycleState.QUASI_CLOSED);
final ContainerInfo containerTwo =
getContainer(LifeCycleState.CLOSED);
+
final Set<ContainerID> containerIDSet = Stream.of(
containerOne.containerID(), containerTwo.containerID())
.collect(Collectors.toSet());
@@ -526,139 +450,42 @@ public class TestContainerReportHandler {
nodeManager.setContainers(datanodeTwo, containerIDSet);
nodeManager.setContainers(datanodeThree, containerIDSet);
- addContainerToContainerManager(
- containerManager, containerOne, containerOneReplicas);
- addContainerToContainerManager(
- containerManager, containerTwo, containerTwoReplicas);
- mockUpdateContainerReplica(
- containerManager, containerOne, containerOneReplicas);
- mockUpdateContainerState(containerManager, containerOne,
- LifeCycleEvent.FORCE_CLOSE, LifeCycleState.CLOSED);
-
- // Container replica with datanodeOne as originNodeId is already
- // QUASI_CLOSED. Now we will tell SCM that container replica from
- // datanodeTwo is also QUASI_CLOSED, but has higher sequenceId.
- final ContainerReportsProto containerReport = getContainerReportsProto(
- containerOne.containerID(), ContainerReplicaProto.State.QUASI_CLOSED,
- datanodeTwo.getUuidString(), 999999L);
+ containerStateManager.loadContainer(containerOne);
+ containerStateManager.loadContainer(containerTwo);
- final EventPublisher publisher = Mockito.mock(EventPublisher.class);
- final ContainerReportFromDatanode containerReportFromDatanode =
- new ContainerReportFromDatanode(datanodeTwo, containerReport);
- reportHandler.onMessage(containerReportFromDatanode, publisher);
+ containerOneReplicas.forEach(r -> {
+ try {
+ containerStateManager.updateContainerReplica(
+ containerTwo.containerID(), r);
+ } catch (ContainerNotFoundException ignored) {
- // Now we should get force close container event for containerOne on
- // datanodeTwo
- Mockito.verify(publisher, Mockito.times(1))
- .fireEvent(Mockito.any(), Mockito.any());
- // TODO: verify whether are actually getting a force close container
- // datanode command for containerOne/datanodeTwo
-
- // The sequence id of the container should have been updated.
- Assert.assertEquals(999999L, containerOne.getSequenceId());
+ }
+ });
- // Now datanodeTwo should close containerOne.
- final ContainerReportsProto containerReportTwo = getContainerReportsProto(
- containerOne.containerID(), ContainerReplicaProto.State.CLOSED,
- datanodeTwo.getUuidString(), 999999L);
- final ContainerReportFromDatanode containerReportFromDatanodeTwo =
- new ContainerReportFromDatanode(datanodeTwo, containerReportTwo);
- reportHandler.onMessage(containerReportFromDatanodeTwo, publisher);
+ containerTwoReplicas.forEach(r -> {
+ try {
+ containerStateManager.updateContainerReplica(
+ containerTwo.containerID(), r);
+ } catch (ContainerNotFoundException ignored) {
- // The container should be closed in SCM now.
- Assert.assertEquals(LifeCycleState.CLOSED, containerOne.getState());
- }
+ }
+ });
- @Test
- public void testQuasiClosedWithSameOriginNodeReplica()
- throws NodeNotFoundException, IOException {
- /*
- * The container is in QUASI_CLOSED state.
- * - One of the replica is in QUASI_CLOSED state
- * - The other two replica are in OPEN/CLOSING state
- *
- * The datanode reports a QUASI_CLOSED replica which has the same
- * origin node id as the existing QUASI_CLOSED replica.
- *
- * In this case SCM should not CLOSE the container.
- */
- final NodeManager nodeManager = new MockNodeManager(true, 10);
- final ContainerManager containerManager = Mockito.mock(
- ContainerManager.class);
- final ReplicationActivityStatus replicationActivityStatus =
- new ReplicationActivityStatus(scheduler);
- replicationActivityStatus.enableReplication();
-
- final PipelineManager pipelineManager = Mockito.mock(PipelineManager.class);
- final ContainerReportHandler reportHandler = new ContainerReportHandler(
- nodeManager, pipelineManager, containerManager,
- replicationActivityStatus);
- final Iterator<DatanodeDetails> nodeIterator = nodeManager.getNodes(
- NodeState.HEALTHY).iterator();
- final DatanodeDetails datanodeOne = nodeIterator.next();
- final DatanodeDetails datanodeTwo = nodeIterator.next();
- final DatanodeDetails datanodeThree = nodeIterator.next();
- final ContainerInfo containerOne =
- getContainer(LifeCycleState.QUASI_CLOSED);
- final ContainerInfo containerTwo =
- getContainer(LifeCycleState.CLOSED);
- final Set<ContainerID> containerIDSet = Stream.of(
- containerOne.containerID(), containerTwo.containerID())
- .collect(Collectors.toSet());
- final Set<ContainerReplica> containerOneReplicas = getReplicas(
- containerOne.containerID(),
- ContainerReplicaProto.State.QUASI_CLOSED,
- datanodeOne);
- containerOneReplicas.addAll(getReplicas(
- containerOne.containerID(),
- ContainerReplicaProto.State.CLOSING,
- datanodeTwo));
- final Set<ContainerReplica> containerTwoReplicas = getReplicas(
- containerTwo.containerID(),
- ContainerReplicaProto.State.CLOSED,
- datanodeOne, datanodeTwo, datanodeThree);
- nodeManager.setContainers(datanodeOne, containerIDSet);
- nodeManager.setContainers(datanodeTwo, containerIDSet);
- nodeManager.setContainers(datanodeThree,
- Collections.singleton(containerTwo.containerID()));
-
- addContainerToContainerManager(
- containerManager, containerOne, containerOneReplicas);
- addContainerToContainerManager(
- containerManager, containerTwo, containerTwoReplicas);
-
- mockUpdateContainerReplica(
- containerManager, containerOne, containerOneReplicas);
- mockUpdateContainerState(containerManager, containerOne,
- LifeCycleEvent.FORCE_CLOSE, LifeCycleState.CLOSED);
-
- // containerOne is QUASI_CLOSED in datanodeOne and CLOSING in datanodeTwo.
- // Now datanodeThree is sending container report which says that it has
- // containerOne replica, but the originNodeId of this replica is
- // datanodeOne. In this case we should not force close the container even
- // though we got two QUASI_CLOSED replicas.
final ContainerReportsProto containerReport = getContainerReportsProto(
- containerOne.containerID(), ContainerReplicaProto.State.QUASI_CLOSED,
+ containerOne.containerID(), ContainerReplicaProto.State.CLOSED,
datanodeOne.getUuidString());
- final EventPublisher publisher = Mockito.mock(EventPublisher.class);
+
final ContainerReportFromDatanode containerReportFromDatanode =
- new ContainerReportFromDatanode(datanodeThree, containerReport);
+ new ContainerReportFromDatanode(datanodeOne, containerReport);
reportHandler.onMessage(containerReportFromDatanode, publisher);
- Mockito.verify(publisher, Mockito.times(0))
- .fireEvent(Mockito.any(), Mockito.any());
+ Assert.assertEquals(LifeCycleState.CLOSED, containerOne.getState());
}
private static ContainerReportsProto getContainerReportsProto(
final ContainerID containerId, final ContainerReplicaProto.State state,
final String originNodeId) {
- return getContainerReportsProto(containerId, state, originNodeId, 100L);
- }
-
- private static ContainerReportsProto getContainerReportsProto(
- final ContainerID containerId, final ContainerReplicaProto.State state,
- final String originNodeId, final long bcsid) {
final ContainerReportsProto.Builder crBuilder =
ContainerReportsProto.newBuilder();
final ContainerReplicaProto replicaProto =
@@ -674,7 +501,7 @@ public class TestContainerReportHandler {
.setWriteCount(100000000L)
.setReadBytes(2000000000L)
.setWriteBytes(2000000000L)
- .setBlockCommitSequenceId(bcsid)
+ .setBlockCommitSequenceId(10000L)
.setDeleteTransactionId(0)
.build();
return crBuilder.addReports(replicaProto).build();
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHelper.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHelper.java
deleted file mode 100644
index 860ec4d..0000000
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHelper.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.hadoop.hdds.scm.container;
-
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
-import org.mockito.Mockito;
-import org.mockito.stubbing.Answer;
-
-import java.io.IOException;
-import java.util.Set;
-
-/**
- * Helper methods for testing ContainerReportHandler and
- * IncrementalContainerReportHandler.
- */
-public final class TestContainerReportHelper {
-
- private TestContainerReportHelper() {}
-
- static void addContainerToContainerManager(
- final ContainerManager containerManager, final ContainerInfo container,
- final Set<ContainerReplica> replicas) throws ContainerNotFoundException {
- Mockito.when(containerManager.getContainer(container.containerID()))
- .thenReturn(container);
- Mockito.when(
- containerManager.getContainerReplicas(container.containerID()))
- .thenReturn(replicas);
- }
-
- static void mockUpdateContainerReplica(
- final ContainerManager containerManager,
- final ContainerInfo containerInfo, final Set<ContainerReplica> replicas)
- throws ContainerNotFoundException {
- Mockito.doAnswer((Answer<Void>) invocation -> {
- Object[] args = invocation.getArguments();
- if (args[0].equals(containerInfo.containerID())) {
- ContainerReplica replica = (ContainerReplica) args[1];
- replicas.remove(replica);
- replicas.add(replica);
- }
- return null;
- }).when(containerManager).updateContainerReplica(
- Mockito.any(ContainerID.class), Mockito.any(ContainerReplica.class));
- }
-
- static void mockUpdateContainerState(
- final ContainerManager containerManager,
- final ContainerInfo containerInfo,
- final LifeCycleEvent event, final LifeCycleState state)
- throws IOException {
- Mockito.doAnswer((Answer<LifeCycleState>) invocation -> {
- containerInfo.setState(state);
- return containerInfo.getState();
- }).when(containerManager).updateContainerState(
- containerInfo.containerID(), event);
- }
-
-}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestIncrementalContainerReportHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestIncrementalContainerReportHandler.java
index 6c9383f..7b8f9fc7b 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestIncrementalContainerReportHandler.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestIncrementalContainerReportHandler.java
@@ -17,47 +17,78 @@
*/
package org.apache.hadoop.hdds.scm.container;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.IncrementalContainerReportProto;
import org.apache.hadoop.hdds.scm.TestUtils;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
.IncrementalContainerReportFromDatanode;
import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.junit.After;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import java.io.IOException;
import java.util.Set;
-import static org.apache.hadoop.hdds.scm.container
- .TestContainerReportHelper.addContainerToContainerManager;
import static org.apache.hadoop.hdds.scm.TestUtils.getContainer;
import static org.apache.hadoop.hdds.scm.TestUtils.getReplicas;
-import static org.apache.hadoop.hdds.scm.container
- .TestContainerReportHelper.mockUpdateContainerReplica;
-import static org.apache.hadoop.hdds.scm.container
- .TestContainerReportHelper.mockUpdateContainerState;
/**
* Test cases to verify the functionality of IncrementalContainerReportHandler.
*/
public class TestIncrementalContainerReportHandler {
+ private ContainerManager containerManager;
+ private ContainerStateManager containerStateManager;
+ private EventPublisher publisher;
+
+ @Before
+ public void setup() throws IOException {
+ final Configuration conf = new OzoneConfiguration();
+ this.containerManager = Mockito.mock(ContainerManager.class);
+ this.containerStateManager = new ContainerStateManager(conf);
+ this.publisher = Mockito.mock(EventPublisher.class);
+
+
+ Mockito.when(containerManager.getContainer(Mockito.any(ContainerID.class)))
+ .thenAnswer(invocation -> containerStateManager
+ .getContainer((ContainerID)invocation.getArguments()[0]));
+
+ Mockito.when(containerManager.getContainerReplicas(
+ Mockito.any(ContainerID.class)))
+ .thenAnswer(invocation -> containerStateManager
+ .getContainerReplicas((ContainerID)invocation.getArguments()[0]));
+
+ Mockito.doAnswer(invocation -> {
+ containerStateManager
+ .updateContainerState((ContainerID)invocation.getArguments()[0],
+ (HddsProtos.LifeCycleEvent)invocation.getArguments()[1]);
+ return null;
+ }).when(containerManager).updateContainerState(
+ Mockito.any(ContainerID.class),
+ Mockito.any(HddsProtos.LifeCycleEvent.class));
+
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ containerStateManager.close();
+ }
+
+
@Test
public void testClosingToClosed() throws IOException {
- final ContainerManager containerManager = Mockito.mock(
- ContainerManager.class);
- final PipelineManager pipelineManager = Mockito.mock(PipelineManager.class);
final IncrementalContainerReportHandler reportHandler =
- new IncrementalContainerReportHandler(
- pipelineManager, containerManager);
+ new IncrementalContainerReportHandler(containerManager);
final ContainerInfo container = getContainer(LifeCycleState.CLOSING);
final DatanodeDetails datanodeOne = TestUtils.randomDatanodeDetails();
final DatanodeDetails datanodeTwo = TestUtils.randomDatanodeDetails();
@@ -67,34 +98,31 @@ public class TestIncrementalContainerReportHandler {
ContainerReplicaProto.State.CLOSING,
datanodeOne, datanodeTwo, datanodeThree);
- addContainerToContainerManager(
- containerManager, container, containerReplicas);
- mockUpdateContainerReplica(
- containerManager, container, containerReplicas);
- mockUpdateContainerState(containerManager, container,
- LifeCycleEvent.CLOSE, LifeCycleState.CLOSED);
+ containerStateManager.loadContainer(container);
+ containerReplicas.forEach(r -> {
+ try {
+ containerStateManager.updateContainerReplica(
+ container.containerID(), r);
+ } catch (ContainerNotFoundException ignored) {
+
+ }
+ });
final IncrementalContainerReportProto containerReport =
getIncrementalContainerReportProto(container.containerID(),
ContainerReplicaProto.State.CLOSED,
datanodeOne.getUuidString());
- final EventPublisher publisher = Mockito.mock(EventPublisher.class);
final IncrementalContainerReportFromDatanode icrFromDatanode =
new IncrementalContainerReportFromDatanode(
datanodeOne, containerReport);
reportHandler.onMessage(icrFromDatanode, publisher);
- Assert.assertEquals(
- LifeCycleState.CLOSED, container.getState());
+ Assert.assertEquals(LifeCycleState.CLOSED, container.getState());
}
@Test
public void testClosingToQuasiClosed() throws IOException {
- final ContainerManager containerManager = Mockito.mock(
- ContainerManager.class);
- final PipelineManager pipelineManager = Mockito.mock(PipelineManager.class);
final IncrementalContainerReportHandler reportHandler =
- new IncrementalContainerReportHandler(
- pipelineManager, containerManager);
+ new IncrementalContainerReportHandler(containerManager);
final ContainerInfo container = getContainer(LifeCycleState.CLOSING);
final DatanodeDetails datanodeOne = TestUtils.randomDatanodeDetails();
final DatanodeDetails datanodeTwo = TestUtils.randomDatanodeDetails();
@@ -104,34 +132,32 @@ public class TestIncrementalContainerReportHandler {
ContainerReplicaProto.State.CLOSING,
datanodeOne, datanodeTwo, datanodeThree);
- addContainerToContainerManager(
- containerManager, container, containerReplicas);
- mockUpdateContainerReplica(
- containerManager, container, containerReplicas);
- mockUpdateContainerState(containerManager, container,
- LifeCycleEvent.QUASI_CLOSE, LifeCycleState.QUASI_CLOSED);
+ containerStateManager.loadContainer(container);
+ containerReplicas.forEach(r -> {
+ try {
+ containerStateManager.updateContainerReplica(
+ container.containerID(), r);
+ } catch (ContainerNotFoundException ignored) {
+
+ }
+ });
+
final IncrementalContainerReportProto containerReport =
getIncrementalContainerReportProto(container.containerID(),
ContainerReplicaProto.State.QUASI_CLOSED,
datanodeOne.getUuidString());
- final EventPublisher publisher = Mockito.mock(EventPublisher.class);
final IncrementalContainerReportFromDatanode icrFromDatanode =
new IncrementalContainerReportFromDatanode(
datanodeOne, containerReport);
reportHandler.onMessage(icrFromDatanode, publisher);
- Assert.assertEquals(
- LifeCycleState.QUASI_CLOSED, container.getState());
+ Assert.assertEquals(LifeCycleState.QUASI_CLOSED, container.getState());
}
@Test
public void testQuasiClosedToClosed() throws IOException {
- final ContainerManager containerManager = Mockito.mock(
- ContainerManager.class);
- final PipelineManager pipelineManager = Mockito.mock(PipelineManager.class);
final IncrementalContainerReportHandler reportHandler =
- new IncrementalContainerReportHandler(
- pipelineManager, containerManager);
+ new IncrementalContainerReportHandler(containerManager);
final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED);
final DatanodeDetails datanodeOne = TestUtils.randomDatanodeDetails();
final DatanodeDetails datanodeTwo = TestUtils.randomDatanodeDetails();
@@ -145,38 +171,25 @@ public class TestIncrementalContainerReportHandler {
ContainerReplicaProto.State.QUASI_CLOSED,
datanodeThree));
+ containerStateManager.loadContainer(container);
+ containerReplicas.forEach(r -> {
+ try {
+ containerStateManager.updateContainerReplica(
+ container.containerID(), r);
+ } catch (ContainerNotFoundException ignored) {
- addContainerToContainerManager(
- containerManager, container, containerReplicas);
- mockUpdateContainerReplica(
- containerManager, container, containerReplicas);
- mockUpdateContainerState(containerManager, container,
- LifeCycleEvent.FORCE_CLOSE, LifeCycleState.CLOSED);
+ }
+ });
final IncrementalContainerReportProto containerReport =
getIncrementalContainerReportProto(container.containerID(),
- ContainerReplicaProto.State.QUASI_CLOSED,
- datanodeOne.getUuidString(), 999999L);
- final EventPublisher publisher = Mockito.mock(EventPublisher.class);
- final IncrementalContainerReportFromDatanode icrFromDatanode =
- new IncrementalContainerReportFromDatanode(
- datanodeOne, containerReport);
- reportHandler.onMessage(icrFromDatanode, publisher);
-
- // SCM should issue force close.
- Mockito.verify(publisher, Mockito.times(1))
- .fireEvent(Mockito.any(), Mockito.any());
-
- final IncrementalContainerReportProto containerReportTwo =
- getIncrementalContainerReportProto(container.containerID(),
ContainerReplicaProto.State.CLOSED,
- datanodeOne.getUuidString(), 999999L);
- final IncrementalContainerReportFromDatanode icrTwoFromDatanode =
+ datanodeThree.getUuidString());
+ final IncrementalContainerReportFromDatanode icr =
new IncrementalContainerReportFromDatanode(
- datanodeOne, containerReportTwo);
- reportHandler.onMessage(icrTwoFromDatanode, publisher);
- Assert.assertEquals(
- LifeCycleState.CLOSED, container.getState());
+ datanodeOne, containerReport);
+ reportHandler.onMessage(icr, publisher);
+ Assert.assertEquals(LifeCycleState.CLOSED, container.getState());
}
private static IncrementalContainerReportProto
@@ -184,15 +197,6 @@ public class TestIncrementalContainerReportHandler {
final ContainerID containerId,
final ContainerReplicaProto.State state,
final String originNodeId) {
- return getIncrementalContainerReportProto(
- containerId, state, originNodeId, 100L);
- }
-
- private static IncrementalContainerReportProto
- getIncrementalContainerReportProto(
- final ContainerID containerId,
- final ContainerReplicaProto.State state,
- final String originNodeId, final long bcsid) {
final IncrementalContainerReportProto.Builder crBuilder =
IncrementalContainerReportProto.newBuilder();
final ContainerReplicaProto replicaProto =
@@ -208,7 +212,7 @@ public class TestIncrementalContainerReportHandler {
.setWriteCount(100000000L)
.setReadBytes(2000000000L)
.setWriteBytes(2000000000L)
- .setBlockCommitSequenceId(bcsid)
+ .setBlockCommitSequenceId(10000L)
.setDeleteTransactionId(0)
.build();
return crBuilder.addReport(replicaProto).build();
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationActivityStatus.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationActivityStatus.java
deleted file mode 100644
index c36ba75..0000000
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationActivityStatus.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdds.scm.container.replication;
-
-import static org.junit.Assert.*;
-
-import java.util.concurrent.TimeoutException;
-
-import org.apache.hadoop.hdds.HddsConfigKeys;
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.scm.block.BlockManager;
-import org.apache.hadoop.hdds.scm.block.BlockManagerImpl;
-import org.apache.hadoop.hdds.scm.chillmode.ChillModeHandler;
-import org.apache.hadoop.hdds.scm.chillmode.SCMChillModeManager;
-import org.apache.hadoop.hdds.scm.events.SCMEvents;
-import org.apache.hadoop.hdds.scm.server.SCMClientProtocolServer;
-import org.apache.hadoop.hdds.server.events.EventQueue;
-import org.apache.hadoop.test.GenericTestUtils;
-import org.apache.hadoop.utils.Scheduler;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-/**
- * Tests for ReplicationActivityStatus.
- */
-public class TestReplicationActivityStatus {
-
- private static EventQueue eventQueue;
- private static ReplicationActivityStatus replicationActivityStatus;
-
- @BeforeClass
- public static void setup() {
- eventQueue = new EventQueue();
- replicationActivityStatus = new ReplicationActivityStatus(
- new Scheduler("SCMCommonScheduler", false, 1));
-
- OzoneConfiguration ozoneConfiguration = new OzoneConfiguration();
- ozoneConfiguration.set(HddsConfigKeys.
- HDDS_SCM_WAIT_TIME_AFTER_CHILL_MODE_EXIT, "3s");
-
- SCMClientProtocolServer scmClientProtocolServer =
- Mockito.mock(SCMClientProtocolServer.class);
- BlockManager blockManager = Mockito.mock(BlockManagerImpl.class);
- ChillModeHandler chillModeHandler =
- new ChillModeHandler(ozoneConfiguration, scmClientProtocolServer,
- blockManager, replicationActivityStatus);
- eventQueue.addHandler(SCMEvents.CHILL_MODE_STATUS, chillModeHandler);
-
- }
-
- @Test
- public void testReplicationStatusForChillMode()
- throws TimeoutException, InterruptedException {
- assertFalse(replicationActivityStatus.isReplicationEnabled());
- // In chill mode replication process should be stopped.
- eventQueue.fireEvent(SCMEvents.CHILL_MODE_STATUS,
- new SCMChillModeManager.ChillModeStatus(true));
- assertFalse(replicationActivityStatus.isReplicationEnabled());
-
- // Replication should be enabled when chill mode if off.
- eventQueue.fireEvent(SCMEvents.CHILL_MODE_STATUS,
- new SCMChillModeManager.ChillModeStatus(false));
- GenericTestUtils.waitFor(() -> {
- return replicationActivityStatus.isReplicationEnabled();
- }, 10, 1000*5);
- assertTrue(replicationActivityStatus.isReplicationEnabled());
- }
-}
\ No newline at end of file
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMClientProtocolServer.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMClientProtocolServer.java
index 1846b0c..ea59af3 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMClientProtocolServer.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMClientProtocolServer.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.scm.block.BlockManager;
import org.apache.hadoop.hdds.scm.block.BlockManagerImpl;
import org.apache.hadoop.hdds.scm.chillmode.ChillModeHandler;
-import org.apache.hadoop.hdds.scm.container.replication.ReplicationActivityStatus;
+import org.apache.hadoop.hdds.scm.container.ReplicationManager;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.server.events.EventQueue;
@@ -48,10 +48,10 @@ public class TestSCMClientProtocolServer {
eventQueue = new EventQueue();
scmClientProtocolServer = new SCMClientProtocolServer(config, null);
BlockManager blockManager = Mockito.mock(BlockManagerImpl.class);
- ReplicationActivityStatus replicationActivityStatus =
- Mockito.mock(ReplicationActivityStatus.class);
+ ReplicationManager replicationManager =
+ Mockito.mock(ReplicationManager.class);
ChillModeHandler chillModeHandler = new ChillModeHandler(config,
- scmClientProtocolServer, blockManager, replicationActivityStatus);
+ scmClientProtocolServer, blockManager, replicationManager);
eventQueue.addHandler(SCMEvents.CHILL_MODE_STATUS, chillModeHandler);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org