You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by so...@apache.org on 2020/10/14 08:04:16 UTC
[hadoop-ozone] branch HDDS-1880-Decom updated: HDDS-4322. Add
integration tests for Decommission and resolve issues detected by the
tests. (#1484)
This is an automated email from the ASF dual-hosted git repository.
sodonnell pushed a commit to branch HDDS-1880-Decom
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git
The following commit(s) were added to refs/heads/HDDS-1880-Decom by this push:
new f74d73a HDDS-4322. Add integration tests for Decommission and resolve issues detected by the tests. (#1484)
f74d73a is described below
commit f74d73ada0db79a3a14df89d9d6474ef39465424
Author: Stephen O'Donnell <st...@gmail.com>
AuthorDate: Wed Oct 14 09:03:58 2020 +0100
HDDS-4322. Add integration tests for Decommission and resolve issues detected by the tests. (#1484)
---
.../hadoop/hdds/protocol/DatanodeDetails.java | 3 +
.../hdds/scm/node/DatanodeAdminMonitorImpl.java | 3 +
.../hadoop/hdds/scm/node/NewNodeHandler.java | 22 ++
.../hdds/scm/node/NodeDecommissionManager.java | 16 +
.../hadoop/hdds/scm/node/NodeStateManager.java | 25 +-
.../hadoop/hdds/scm/node/SCMNodeManager.java | 25 --
.../hdds/scm/server/StorageContainerManager.java | 7 +-
.../java/org/apache/hadoop/ozone/TestDataUtil.java | 11 +-
.../scm/node/TestDecommissionAndMaintenance.java | 359 ++++++++++++++++++---
9 files changed, 401 insertions(+), 70 deletions(-)
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java
index c60eb1c..c7d5a28 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java
@@ -476,6 +476,9 @@ public class DatanodeDetails extends NodeImpl implements
this.setupTime = details.getSetupTime();
this.revision = details.getRevision();
this.buildDate = details.getBuildDate();
+ this.persistedOpState = details.getPersistedOpState();
+ this.persistedOpStateExpiryEpochSec =
+ details.getPersistedOpStateExpiryEpochSec();
return this;
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminMonitorImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminMonitorImpl.java
index 0bbd13d..cf01af5 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminMonitorImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminMonitorImpl.java
@@ -294,6 +294,9 @@ public class DatanodeAdminMonitorImpl implements DatanodeAdminMonitor {
"in containerManager", cid, dn);
}
}
+ LOG.info("{} has {} sufficientlyReplicated, {} underReplicated and {} " +
+ "unhealthy containers",
+ dn, sufficientlyReplicated, underReplicated, unhealthy);
return underReplicated == 0 && unhealthy == 0;
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NewNodeHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NewNodeHandler.java
index a40a63a..42369fd 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NewNodeHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NewNodeHandler.java
@@ -20,9 +20,13 @@ package org.apache.hadoop.hdds.scm.node;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Handles New Node event.
@@ -30,11 +34,16 @@ import org.apache.hadoop.hdds.server.events.EventPublisher;
public class NewNodeHandler implements EventHandler<DatanodeDetails> {
private final PipelineManager pipelineManager;
+ private final NodeDecommissionManager decommissionManager;
private final ConfigurationSource conf;
+ private static final Logger LOG =
+ LoggerFactory.getLogger(NewNodeHandler.class);
public NewNodeHandler(PipelineManager pipelineManager,
+ NodeDecommissionManager decommissionManager,
ConfigurationSource conf) {
this.pipelineManager = pipelineManager;
+ this.decommissionManager = decommissionManager;
this.conf = conf;
}
@@ -42,5 +51,18 @@ public class NewNodeHandler implements EventHandler<DatanodeDetails> {
public void onMessage(DatanodeDetails datanodeDetails,
EventPublisher publisher) {
pipelineManager.triggerPipelineCreation();
+ HddsProtos.NodeOperationalState opState
+ = datanodeDetails.getPersistedOpState();
+ if (datanodeDetails.getPersistedOpState()
+ != HddsProtos.NodeOperationalState.IN_SERVICE) {
+ try {
+ decommissionManager.continueAdminForNode(datanodeDetails);
+ } catch (NodeNotFoundException e) {
+ // Should not happen, as the node has just registered to call this event
+ // handler.
+ LOG.warn("NodeNotFound when adding the node to the decommissionManager",
+ e);
+ }
+ }
}
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeDecommissionManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeDecommissionManager.java
index 3258fef..d7432bc 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeDecommissionManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeDecommissionManager.java
@@ -233,6 +233,22 @@ public class NodeDecommissionManager {
}
}
+ /**
+ * If a SCM is restarted, then upon re-registration the datanode will already
+ * be in DECOMMISSIONING or ENTERING_MAINTENANCE state. In that case, it
+ * needs to be added back into the monitor to track its progress.
+ * @param dn Datanode to add back to tracking.
+ * @throws NodeNotFoundException
+ */
+ public synchronized void continueAdminForNode(DatanodeDetails dn)
+ throws NodeNotFoundException {
+ NodeOperationalState opState = getNodeStatus(dn).getOperationalState();
+ if (opState == NodeOperationalState.DECOMMISSIONING
+ || opState == NodeOperationalState.ENTERING_MAINTENANCE) {
+ monitor.startMonitoring(dn, 0);
+ }
+ }
+
public synchronized void startDecommission(DatanodeDetails dn)
throws NodeNotFoundException, InvalidNodeStateException {
NodeStatus nodeStatus = getNodeStatus(dn);
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java
index 47ce56e..d7af7f5 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java
@@ -239,12 +239,33 @@ public class NodeStateManager implements Runnable, Closeable {
*/
public void addNode(DatanodeDetails datanodeDetails)
throws NodeAlreadyExistsException {
- nodeStateMap.addNode(datanodeDetails, new NodeStatus(
- NodeOperationalState.IN_SERVICE, nodeHealthSM.getInitialState()));
+ NodeStatus newNodeStatus = newNodeStatus(datanodeDetails);
+ nodeStateMap.addNode(datanodeDetails, newNodeStatus);
eventPublisher.fireEvent(SCMEvents.NEW_NODE, datanodeDetails);
}
/**
+ * When a node registers with SCM, the operational state stored on the
+ * datanode is the source of truth. Therefore, if the datanode reports
+ * anything other than IN_SERVICE on registration, the state in SCM should be
+ * updated to reflect the datanode state.
+ * @param dn DatanodeDetails reported by the datanode
+ */
+ private NodeStatus newNodeStatus(DatanodeDetails dn) {
+ HddsProtos.NodeOperationalState dnOpState = dn.getPersistedOpState();
+ if (dnOpState != NodeOperationalState.IN_SERVICE) {
+ LOG.info("Updating nodeOperationalState on registration as the " +
+ "datanode has a persisted state of {} and expiry of {}",
+ dnOpState, dn.getPersistedOpStateExpiryEpochSec());
+ return new NodeStatus(dnOpState, nodeHealthSM.getInitialState(),
+ dn.getPersistedOpStateExpiryEpochSec());
+ } else {
+ return new NodeStatus(
+ NodeOperationalState.IN_SERVICE, nodeHealthSM.getInitialState());
+ }
+ }
+
+ /**
* Adds a pipeline in the node2PipelineMap.
* @param pipeline - Pipeline to be added
*/
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
index 825593d..323d79a 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
@@ -35,7 +35,6 @@ import java.util.stream.Collectors;
import org.apache.hadoop.hdds.DFSConfigKeysLegacy;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
@@ -350,7 +349,6 @@ public class SCMNodeManager implements NodeManager {
datanodeDetails.toString());
}
}
- registerInitialDatanodeOpState(datanodeDetails);
return RegisteredCommand.newBuilder().setErrorCode(ErrorCode.success)
.setDatanode(datanodeDetails)
@@ -359,29 +357,6 @@ public class SCMNodeManager implements NodeManager {
}
/**
- * When a node registers with SCM, the operational state stored on the
- * datanode is the source of truth. Therefore, if the datanode reports
- * anything other than IN_SERVICE on registration, the state in SCM should be
- * updated to reflect the datanode state.
- * @param dn
- */
- private void registerInitialDatanodeOpState(DatanodeDetails dn) {
- try {
- HddsProtos.NodeOperationalState dnOpState = dn.getPersistedOpState();
- if (dnOpState != NodeOperationalState.IN_SERVICE) {
- LOG.info("Updating nodeOperationalState on registration as the " +
- "datanode has a persisted state of {} and expiry of {}",
- dnOpState, dn.getPersistedOpStateExpiryEpochSec());
- setNodeOperationalState(dn, dnOpState,
- dn.getPersistedOpStateExpiryEpochSec());
- }
- } catch (NodeNotFoundException e) {
- LOG.error("Unable to find the node when setting the operational state",
- e);
- }
- }
-
- /**
* Add an entry to the dnsToUuidMap, which maps hostname / IP to the DNs
* running on that host. As each address can have many DNs running on it,
* this is a one to many mapping.
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 0319845..e26d521 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -296,7 +296,8 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
CommandStatusReportHandler cmdStatusReportHandler =
new CommandStatusReportHandler();
- NewNodeHandler newNodeHandler = new NewNodeHandler(pipelineManager, conf);
+ NewNodeHandler newNodeHandler = new NewNodeHandler(pipelineManager,
+ scmDecommissionManager, conf);
StaleNodeHandler staleNodeHandler =
new StaleNodeHandler(scmNodeManager, pipelineManager, conf);
DeadNodeHandler deadNodeHandler = new DeadNodeHandler(scmNodeManager,
@@ -339,8 +340,6 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
blockProtocolServer = new SCMBlockProtocolServer(conf, this);
clientProtocolServer = new SCMClientProtocolServer(conf, this);
httpServer = new StorageContainerManagerHttpServer(conf);
- scmDecommissionManager = new NodeDecommissionManager(conf, scmNodeManager,
- containerManager, eventQueue, replicationManager);
eventQueue.addHandler(SCMEvents.DATANODE_COMMAND, scmNodeManager);
eventQueue.addHandler(SCMEvents.RETRIABLE_DATANODE_COMMAND, scmNodeManager);
eventQueue.addHandler(SCMEvents.NODE_REPORT, nodeReportHandler);
@@ -456,6 +455,8 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
scmSafeModeManager = new SCMSafeModeManager(conf,
containerManager.getContainers(), pipelineManager, eventQueue);
}
+ scmDecommissionManager = new NodeDecommissionManager(conf, scmNodeManager,
+ containerManager, eventQueue, replicationManager);
}
/**
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestDataUtil.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestDataUtil.java
index 115336e..4cdefb8 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestDataUtil.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestDataUtil.java
@@ -73,9 +73,16 @@ public final class TestDataUtil {
public static void createKey(OzoneBucket bucket, String keyName,
String content) throws IOException {
+ createKey(bucket, keyName, ReplicationFactor.ONE,
+ ReplicationType.STAND_ALONE, content);
+ }
+
+ public static void createKey(OzoneBucket bucket, String keyName,
+ ReplicationFactor repFactor, ReplicationType repType, String content)
+ throws IOException {
try (OutputStream stream = bucket
- .createKey(keyName, content.length(), ReplicationType.STAND_ALONE,
- ReplicationFactor.ONE, new HashMap<>())) {
+ .createKey(keyName, content.length(), repType, repFactor,
+ new HashMap<>())) {
stream.write(content.getBytes());
}
}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDecommissionAndMaintenance.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDecommissionAndMaintenance.java
index f39af44..8f02878 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDecommissionAndMaintenance.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDecommissionAndMaintenance.java
@@ -17,28 +17,59 @@
*/
package org.apache.hadoop.ozone.scm.node;
+import org.apache.hadoop.hdds.client.ReplicationFactor;
+import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.cli.ContainerOperationClient;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.container.ReplicationManager.ReplicationManagerConfiguration;
import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.node.NodeStatus;
+import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.TestDataUtil;
+import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.test.GenericTestUtils;
import org.junit.After;
import org.junit.Before;
-import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.time.Duration;
import java.util.Arrays;
+import java.util.Iterator;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import static java.util.concurrent.TimeUnit.SECONDS;
import static junit.framework.TestCase.assertEquals;
-import static org.apache.hadoop.hdds.HddsConfigKeys.*;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.*;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_NODE_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONED;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONING;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.DEAD;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.HEALTHY;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_ADMIN_MONITOR_INTERVAL;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
+import static org.junit.Assert.fail;
/**
* Test from the scmclient for decommission and maintenance.
@@ -48,8 +79,15 @@ public class TestDecommissionAndMaintenance {
private static final Logger LOG =
LoggerFactory.getLogger(TestDecommissionAndMaintenance.class);
- private static int numOfDatanodes = 5;
+ private static int numOfDatanodes = 6;
+ private static String bucketName = "bucket1";
+ private static String volName = "vol1";
+ private OzoneBucket bucket;
private MiniOzoneCluster cluster;
+ private NodeManager nm;
+ private ContainerManager cm;
+ private PipelineManager pm;
+ private StorageContainerManager scm;
private ContainerOperationClient scmClient;
@@ -67,70 +105,315 @@ public class TestDecommissionAndMaintenance {
conf.setTimeDuration(HDDS_NODE_REPORT_INTERVAL, 1, SECONDS);
conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, SECONDS);
conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 6, SECONDS);
+ conf.setTimeDuration(OZONE_SCM_DATANODE_ADMIN_MONITOR_INTERVAL,
+ 1, SECONDS);
+
+ ReplicationManagerConfiguration replicationConf =
+ conf.getObject(ReplicationManagerConfiguration.class);
+ replicationConf.setInterval(Duration.ofSeconds(1));
+ conf.setFromObject(replicationConf);
cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(numOfDatanodes)
.build();
cluster.waitForClusterToBeReady();
+ setManagers();
+
+ bucket = TestDataUtil.createVolumeAndBucket(cluster, volName, bucketName);
scmClient = new ContainerOperationClient(conf);
}
@After
- public void tearDown() throws Exception {
+ public void tearDown() {
if (cluster != null) {
cluster.shutdown();
}
}
@Test
- @Ignore // HDDS-3345
- public void testNodeCanBeDecommMaintAndRecommissioned()
- throws IOException {
- NodeManager nm = cluster.getStorageContainerManager().getScmNodeManager();
+ // Decommissioning a node with open pipelines should close the pipelines
+ // and hence the open containers and then the containers should be replicated
+ // by the replication manager.
+ public void testNodeWithOpenPipelineCanBeDecommissioned()
+ throws Exception {
+ // Generate some data on the empty cluster to create some containers
+ generateData(20, "key", ReplicationFactor.THREE, ReplicationType.RATIS);
+ // Locate any container and find its open pipeline
+ final ContainerInfo container =
+ scmClient.listContainer(0, 1).get(0);
+ Pipeline pipeline = pm.getPipeline(container.getPipelineID());
+ assertEquals(Pipeline.PipelineState.OPEN, pipeline.getPipelineState());
+
+ // Ensure all 3 replicas of the container have been reported via ICR
+ GenericTestUtils.waitFor(
+ () -> getContainerReplicas(container).size() == 3,
+ 200, 30000);
+ Set<ContainerReplica> replicas = getContainerReplicas(container);
- List<DatanodeDetails> dns = nm.getAllNodes();
- scmClient.decommissionNodes(Arrays.asList(getDNHostAndPort(dns.get(0))));
+ final DatanodeDetails toDecommission = getOneDNHostingReplica(replicas);
+ scmClient.decommissionNodes(Arrays.asList(
+ getDNHostAndPort(toDecommission)));
- // Ensure one node is decommissioning
+ // Ensure one node transitioned to DECOMMISSIONING
List<DatanodeDetails> decomNodes = nm.getNodes(
- HddsProtos.NodeOperationalState.DECOMMISSIONING,
- HddsProtos.NodeState.HEALTHY);
+ DECOMMISSIONING,
+ HEALTHY);
assertEquals(1, decomNodes.size());
+ waitForDnToReachOpState(toDecommission, DECOMMISSIONED);
- scmClient.recommissionNodes(Arrays.asList(getDNHostAndPort(dns.get(0))));
+ // Should now be 4 replicas online as the DN is still alive but
+ // in the DECOMMISSIONED state.
+ Set<ContainerReplica> newReplicas =
+ cm.getContainerReplicas(container.containerID());
+ assertEquals(4, newReplicas.size());
- // Ensure zero nodes are now decommissioning
- decomNodes = nm.getNodes(
- HddsProtos.NodeOperationalState.DECOMMISSIONING,
- HddsProtos.NodeState.HEALTHY);
- assertEquals(0, decomNodes.size());
+ // Stop the decommissioned DN
+ cluster.shutdownHddsDatanode(toDecommission);
+ waitForDnToReachHealthState(toDecommission, DEAD);
- scmClient.startMaintenanceNodes(Arrays.asList(
- getDNHostAndPort(dns.get(0))), 10);
+ // Now the decommissioned node is dead, we should have
+ // 3 replicas for the tracked container.
+ newReplicas = cm.getContainerReplicas(container.containerID());
+ assertEquals(3, newReplicas.size());
+ }
- // None are decommissioning
- decomNodes = nm.getNodes(
- HddsProtos.NodeOperationalState.DECOMMISSIONING,
- HddsProtos.NodeState.HEALTHY);
- assertEquals(0, decomNodes.size());
+ @Test
+ // After a SCM restart, it will have forgotten all the Operational states.
+ // However the state will have been persisted on the DNs. Therefore on initial
+ // registration, the DN operationalState is the source of truth and SCM should
+ // be updated to reflect that.
+ public void testDecommissionedStateReinstatedAfterSCMRestart()
+ throws Exception {
+ // Decommission any node and wait for it to be DECOMMISSIONED
+ generateData(20, "key", ReplicationFactor.THREE, ReplicationType.RATIS);
+ DatanodeDetails dn = nm.getAllNodes().get(0);
+ scmClient.decommissionNodes(Arrays.asList(getDNHostAndPort(dn)));
+ waitForDnToReachOpState(dn, DECOMMISSIONED);
- // One is in Maintenance
- decomNodes = nm.getNodes(
- HddsProtos.NodeOperationalState.ENTERING_MAINTENANCE,
- HddsProtos.NodeState.HEALTHY);
- assertEquals(1, decomNodes.size());
+ cluster.restartStorageContainerManager(true);
+ setManagers();
+ DatanodeDetails newDn = nm.getNodeByUuid(dn.getUuid().toString());
- scmClient.recommissionNodes(Arrays.asList(getDNHostAndPort(dns.get(0))));
+ // On initial registration, the DN should report its operational state
+ // and if it is decommissioned, that should be updated in the NodeStatus
+ waitForDnToReachOpState(newDn, DECOMMISSIONED);
+ // Also confirm the datanodeDetails correctly reflect the operational
+ // state.
+ waitForDnToReachPersistedOpState(newDn, DECOMMISSIONED);
+ }
+
+ @Test
+ // If a node has not yet completed decommission and SCM is restarted, then
+ // when it re-registers it should re-enter the decommission workflow and
+ // complete decommissioning.
+ public void testDecommissioningNodesCompleteDecommissionOnSCMRestart()
+ throws Exception {
+ // First stop the replicationManager so nodes marked for decommission cannot
+ // make any progress. THe node will be stuck DECOMMISSIONING
+ scm.getReplicationManager().stop();
+ // Generate some data and then pick a DN to decommission which is hosting a
+ // container. This ensures it will not decommission immediately due to
+ // having no containers.
+ generateData(20, "key", ReplicationFactor.THREE, ReplicationType.RATIS);
+ final ContainerInfo container =
+ scmClient.listContainer(0, 1).get(0);
+ GenericTestUtils.waitFor(
+ () -> getContainerReplicas(container).size() == 3,
+ 200, 30000);
+ final DatanodeDetails dn
+ = getOneDNHostingReplica(getContainerReplicas(container));
+ scmClient.decommissionNodes(Arrays.asList(getDNHostAndPort(dn)));
- // None are in maintenance
- decomNodes = nm.getNodes(
- HddsProtos.NodeOperationalState.ENTERING_MAINTENANCE,
- HddsProtos.NodeState.HEALTHY);
- assertEquals(0, decomNodes.size());
+ // Wait for the state to be persisted on the DN so it can report it on
+ // restart of SCM.
+ waitForDnToReachPersistedOpState(dn, DECOMMISSIONING);
+ cluster.restartStorageContainerManager(true);
+ setManagers();
+
+ // After the SCM restart, the DN should report as DECOMMISSIONING, then
+ // it should re-enter the decommission workflow and move to DECOMMISSIONED
+ DatanodeDetails newDn = nm.getNodeByUuid(dn.getUuid().toString());
+ waitForDnToReachOpState(newDn, DECOMMISSIONED);
+ waitForDnToReachPersistedOpState(newDn, DECOMMISSIONED);
}
+ @Test
+ // If a node was decommissioned, and then stopped so it is dead. Then it is
+ // recommissioned in SCM and restarted, the SCM state should be taken as the
+ // source of truth and the node will go to the IN_SERVICE state and the state
+ // should be updated on the DN.
+ public void testStoppedDecommissionedNodeTakesSCMStateOnRestart()
+ throws Exception {
+ // Decommission node and wait for it to be DECOMMISSIONED
+ generateData(20, "key", ReplicationFactor.THREE, ReplicationType.RATIS);
+ DatanodeDetails dn = nm.getAllNodes().get(0);
+ scmClient.decommissionNodes(Arrays.asList(getDNHostAndPort(dn)));
+ waitForDnToReachOpState(dn, DECOMMISSIONED);
+ waitForDnToReachPersistedOpState(dn, DECOMMISSIONED);
+
+ int dnIndex = cluster.getHddsDatanodeIndex(dn);
+ cluster.shutdownHddsDatanode(dnIndex);
+ waitForDnToReachHealthState(dn, DEAD);
+
+ // Datanode is shutdown and dead. Now recommission it in SCM
+ scmClient.recommissionNodes(Arrays.asList(getDNHostAndPort(dn)));
+
+ // Now restart it and ensure it remains IN_SERVICE
+ cluster.restartHddsDatanode(dnIndex, true);
+ DatanodeDetails newDn = nm.getNodeByUuid(dn.getUuid().toString());
+
+ // As this is not an initial registration since SCM was started, the DN
+ // should report its operational state and if it differs from what SCM
+ // has, then the SCM state should be used and the DN state updated.
+ waitForDnToReachHealthState(newDn, HEALTHY);
+ waitForDnToReachOpState(newDn, IN_SERVICE);
+ waitForDnToReachPersistedOpState(dn, IN_SERVICE);
+ }
+
+ @Test
+ // A node which is decommissioning or decommissioned can be move back to
+ // IN_SERVICE.
+ public void testDecommissionedNodeCanBeRecommissioned() throws Exception {
+ generateData(20, "key", ReplicationFactor.THREE, ReplicationType.RATIS);
+ DatanodeDetails dn = nm.getAllNodes().get(0);
+ scmClient.decommissionNodes(Arrays.asList(getDNHostAndPort(dn)));
+
+ GenericTestUtils.waitFor(
+ () -> !dn.getPersistedOpState()
+ .equals(IN_SERVICE),
+ 200, 30000);
+
+ scmClient.recommissionNodes(Arrays.asList(getDNHostAndPort(dn)));
+ waitForDnToReachOpState(dn, IN_SERVICE);
+ waitForDnToReachPersistedOpState(dn, IN_SERVICE);
+ }
+
+ /**
+ * Sets the instance variables to the values for the current MiniCluster.
+ */
+ private void setManagers() {
+ scm = cluster.getStorageContainerManager();
+ nm = scm.getScmNodeManager();
+ cm = scm.getContainerManager();
+ pm = scm.getPipelineManager();
+ }
+
+ /**
+ * Generates some data on the cluster so the cluster has some containers.
+ * @param keyCount The number of keys to create
+ * @param keyPrefix The prefix to use for the key name.
+ * @param repFactor The replication Factor for the keys
+ * @param repType The replication Type for the keys
+ * @throws IOException
+ */
+ private void generateData(int keyCount, String keyPrefix,
+ ReplicationFactor repFactor, ReplicationType repType) throws IOException {
+ for (int i=0; i<keyCount; i++) {
+ TestDataUtil.createKey(bucket, keyPrefix + i, repFactor, repType,
+ "this is the content");
+ }
+ }
+
+ /**
+ * Retrieves the NodeStatus for the given DN or fails the test if the
+ * Node cannot be found. This is a helper method to allow the nodeStatus to be
+ * checked in lambda expressions.
+ * @param dn Datanode for which to retrieve the NodeStatus.
+ * @return
+ */
+ private NodeStatus getNodeStatus(DatanodeDetails dn) {
+ NodeStatus status = null;
+ try {
+ status = nm.getNodeStatus(dn);
+ } catch (NodeNotFoundException e) {
+ fail("Unexpected exception getting the nodeState");
+ }
+ return status;
+ }
+
+ /**
+ * Retrieves the containerReplica set for a given container or fails the test
+ * if the container cannot be found. This is a helper method to allow the
+ * container replica count to be checked in a lambda expression.
+ * @param c The container for which to retrieve replicas
+ * @return
+ */
+ private Set<ContainerReplica> getContainerReplicas(ContainerInfo c) {
+ Set<ContainerReplica> replicas = null;
+ try {
+ replicas = cm.getContainerReplicas(c.containerID());
+ } catch (ContainerNotFoundException e) {
+ fail("Unexpected ContainerNotFoundException");
+ }
+ return replicas;
+ }
+
+ /**
+ * Select any DN hosting a replica from the Replica Set.
+ * @param replicas The set of ContainerReplica
+ * @return Any datanode associated one of the replicas
+ */
+ private DatanodeDetails getOneDNHostingReplica(
+ Set<ContainerReplica> replicas) {
+ // Now Decommission a host with one of the replicas
+ Iterator<ContainerReplica> iter = replicas.iterator();
+ ContainerReplica c = iter.next();
+ return c.getDatanodeDetails();
+ }
+
+ /**
+ * Given a Datanode, return a string consisting of the hostname and one of its
+ * ports in the for host:post.
+ * @param dn Datanode for which to retrieve the host:post string
+ * @return host:port for the given DN.
+ */
private String getDNHostAndPort(DatanodeDetails dn) {
return dn.getHostName()+":"+dn.getPorts().get(0).getValue();
}
+ /**
+ * Wait for the given datanode to reach the given operational state.
+ * @param dn Datanode for which to check the state
+ * @param state The state to wait for.
+ * @throws TimeoutException
+ * @throws InterruptedException
+ */
+ private void waitForDnToReachOpState(DatanodeDetails dn,
+ HddsProtos.NodeOperationalState state)
+ throws TimeoutException, InterruptedException {
+ GenericTestUtils.waitFor(
+ () -> getNodeStatus(dn).getOperationalState().equals(state),
+ 200, 30000);
+ }
+
+ /**
+ * Wait for the given datanode to reach the given Health state.
+ * @param dn Datanode for which to check the state
+ * @param state The state to wait for.
+ * @throws TimeoutException
+ * @throws InterruptedException
+ */
+ private void waitForDnToReachHealthState(DatanodeDetails dn,
+ HddsProtos.NodeState state)
+ throws TimeoutException, InterruptedException {
+ GenericTestUtils.waitFor(
+ () -> getNodeStatus(dn).getHealth().equals(state),
+ 200, 30000);
+ }
+
+ /**
+ * Wait for the given datanode to reach the given persisted state.
+ * @param dn Datanode for which to check the state
+ * @param state The state to wait for.
+ * @throws TimeoutException
+ * @throws InterruptedException
+ */
+ private void waitForDnToReachPersistedOpState(DatanodeDetails dn,
+ HddsProtos.NodeOperationalState state)
+ throws TimeoutException, InterruptedException {
+ GenericTestUtils.waitFor(
+ () -> dn.getPersistedOpState().equals(state),
+ 200, 30000);
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: ozone-commits-help@hadoop.apache.org