You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by so...@apache.org on 2020/10/14 08:04:16 UTC

[hadoop-ozone] branch HDDS-1880-Decom updated: HDDS-4322. Add integration tests for Decommission and resolve issues detected by the tests. (#1484)

This is an automated email from the ASF dual-hosted git repository.

sodonnell pushed a commit to branch HDDS-1880-Decom
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git


The following commit(s) were added to refs/heads/HDDS-1880-Decom by this push:
     new f74d73a  HDDS-4322. Add integration tests for Decommission and resolve issues detected by the tests. (#1484)
f74d73a is described below

commit f74d73ada0db79a3a14df89d9d6474ef39465424
Author: Stephen O'Donnell <st...@gmail.com>
AuthorDate: Wed Oct 14 09:03:58 2020 +0100

    HDDS-4322. Add integration tests for Decommission and resolve issues detected by the tests. (#1484)
---
 .../hadoop/hdds/protocol/DatanodeDetails.java      |   3 +
 .../hdds/scm/node/DatanodeAdminMonitorImpl.java    |   3 +
 .../hadoop/hdds/scm/node/NewNodeHandler.java       |  22 ++
 .../hdds/scm/node/NodeDecommissionManager.java     |  16 +
 .../hadoop/hdds/scm/node/NodeStateManager.java     |  25 +-
 .../hadoop/hdds/scm/node/SCMNodeManager.java       |  25 --
 .../hdds/scm/server/StorageContainerManager.java   |   7 +-
 .../java/org/apache/hadoop/ozone/TestDataUtil.java |  11 +-
 .../scm/node/TestDecommissionAndMaintenance.java   | 359 ++++++++++++++++++---
 9 files changed, 401 insertions(+), 70 deletions(-)

diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java
index c60eb1c..c7d5a28 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java
@@ -476,6 +476,9 @@ public class DatanodeDetails extends NodeImpl implements
       this.setupTime = details.getSetupTime();
       this.revision = details.getRevision();
       this.buildDate = details.getBuildDate();
+      this.persistedOpState = details.getPersistedOpState();
+      this.persistedOpStateExpiryEpochSec =
+          details.getPersistedOpStateExpiryEpochSec();
       return this;
     }
 
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminMonitorImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminMonitorImpl.java
index 0bbd13d..cf01af5 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminMonitorImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminMonitorImpl.java
@@ -294,6 +294,9 @@ public class DatanodeAdminMonitorImpl implements DatanodeAdminMonitor {
             "in containerManager", cid, dn);
       }
     }
+    LOG.info("{} has {} sufficientlyReplicated, {} underReplicated and {} " +
+        "unhealthy containers",
+        dn, sufficientlyReplicated, underReplicated, unhealthy);
     return underReplicated == 0 && unhealthy == 0;
   }
 
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NewNodeHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NewNodeHandler.java
index a40a63a..42369fd 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NewNodeHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NewNodeHandler.java
@@ -20,9 +20,13 @@ package org.apache.hadoop.hdds.scm.node;
 
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
 import org.apache.hadoop.hdds.server.events.EventHandler;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Handles New Node event.
@@ -30,11 +34,16 @@ import org.apache.hadoop.hdds.server.events.EventPublisher;
 public class NewNodeHandler implements EventHandler<DatanodeDetails> {
 
   private final PipelineManager pipelineManager;
+  private final NodeDecommissionManager decommissionManager;
   private final ConfigurationSource conf;
+  private static final Logger LOG =
+      LoggerFactory.getLogger(NewNodeHandler.class);
 
   public NewNodeHandler(PipelineManager pipelineManager,
+      NodeDecommissionManager decommissionManager,
       ConfigurationSource conf) {
     this.pipelineManager = pipelineManager;
+    this.decommissionManager = decommissionManager;
     this.conf = conf;
   }
 
@@ -42,5 +51,18 @@ public class NewNodeHandler implements EventHandler<DatanodeDetails> {
   public void onMessage(DatanodeDetails datanodeDetails,
       EventPublisher publisher) {
     pipelineManager.triggerPipelineCreation();
+    HddsProtos.NodeOperationalState opState
+        = datanodeDetails.getPersistedOpState();
+    if (datanodeDetails.getPersistedOpState()
+        != HddsProtos.NodeOperationalState.IN_SERVICE) {
+      try {
+        decommissionManager.continueAdminForNode(datanodeDetails);
+      } catch (NodeNotFoundException e) {
+        // Should not happen, as the node has just registered to call this event
+        // handler.
+        LOG.warn("NodeNotFound when adding the node to the decommissionManager",
+            e);
+      }
+    }
   }
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeDecommissionManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeDecommissionManager.java
index 3258fef..d7432bc 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeDecommissionManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeDecommissionManager.java
@@ -233,6 +233,22 @@ public class NodeDecommissionManager {
     }
   }
 
+  /**
+   * If a SCM is restarted, then upon re-registration the datanode will already
+   * be in DECOMMISSIONING or ENTERING_MAINTENANCE state. In that case, it
+   * needs to be added back into the monitor to track its progress.
+   * @param dn Datanode to add back to tracking.
+   * @throws NodeNotFoundException
+   */
+  public synchronized void continueAdminForNode(DatanodeDetails dn)
+      throws NodeNotFoundException {
+    NodeOperationalState opState = getNodeStatus(dn).getOperationalState();
+    if (opState == NodeOperationalState.DECOMMISSIONING
+        || opState == NodeOperationalState.ENTERING_MAINTENANCE) {
+      monitor.startMonitoring(dn, 0);
+    }
+  }
+
   public synchronized void startDecommission(DatanodeDetails dn)
       throws NodeNotFoundException, InvalidNodeStateException {
     NodeStatus nodeStatus = getNodeStatus(dn);
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java
index 47ce56e..d7af7f5 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java
@@ -239,12 +239,33 @@ public class NodeStateManager implements Runnable, Closeable {
    */
   public void addNode(DatanodeDetails datanodeDetails)
       throws NodeAlreadyExistsException {
-    nodeStateMap.addNode(datanodeDetails, new NodeStatus(
-        NodeOperationalState.IN_SERVICE, nodeHealthSM.getInitialState()));
+    NodeStatus newNodeStatus = newNodeStatus(datanodeDetails);
+    nodeStateMap.addNode(datanodeDetails, newNodeStatus);
     eventPublisher.fireEvent(SCMEvents.NEW_NODE, datanodeDetails);
   }
 
   /**
+   * When a node registers with SCM, the operational state stored on the
+   * datanode is the source of truth. Therefore, if the datanode reports
+   * anything other than IN_SERVICE on registration, the state in SCM should be
+   * updated to reflect the datanode state.
+   * @param dn DatanodeDetails reported by the datanode
+   */
+  private NodeStatus newNodeStatus(DatanodeDetails dn) {
+    HddsProtos.NodeOperationalState dnOpState = dn.getPersistedOpState();
+    if (dnOpState != NodeOperationalState.IN_SERVICE) {
+      LOG.info("Updating nodeOperationalState on registration as the " +
+              "datanode has a persisted state of {} and expiry of {}",
+          dnOpState, dn.getPersistedOpStateExpiryEpochSec());
+      return new NodeStatus(dnOpState, nodeHealthSM.getInitialState(),
+          dn.getPersistedOpStateExpiryEpochSec());
+    } else {
+      return new NodeStatus(
+          NodeOperationalState.IN_SERVICE, nodeHealthSM.getInitialState());
+    }
+  }
+
+  /**
    * Adds a pipeline in the node2PipelineMap.
    * @param pipeline - Pipeline to be added
    */
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
index 825593d..323d79a 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
@@ -35,7 +35,6 @@ import java.util.stream.Collectors;
 import org.apache.hadoop.hdds.DFSConfigKeysLegacy;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
@@ -350,7 +349,6 @@ public class SCMNodeManager implements NodeManager {
             datanodeDetails.toString());
       }
     }
-    registerInitialDatanodeOpState(datanodeDetails);
 
     return RegisteredCommand.newBuilder().setErrorCode(ErrorCode.success)
         .setDatanode(datanodeDetails)
@@ -359,29 +357,6 @@ public class SCMNodeManager implements NodeManager {
   }
 
   /**
-   * When a node registers with SCM, the operational state stored on the
-   * datanode is the source of truth. Therefore, if the datanode reports
-   * anything other than IN_SERVICE on registration, the state in SCM should be
-   * updated to reflect the datanode state.
-   * @param dn
-   */
-  private void registerInitialDatanodeOpState(DatanodeDetails dn) {
-    try {
-      HddsProtos.NodeOperationalState dnOpState = dn.getPersistedOpState();
-      if (dnOpState != NodeOperationalState.IN_SERVICE) {
-        LOG.info("Updating nodeOperationalState on registration as the " +
-            "datanode has a persisted state of {} and expiry of {}",
-            dnOpState, dn.getPersistedOpStateExpiryEpochSec());
-        setNodeOperationalState(dn, dnOpState,
-            dn.getPersistedOpStateExpiryEpochSec());
-      }
-    } catch (NodeNotFoundException e) {
-      LOG.error("Unable to find the node when setting the operational state",
-          e);
-    }
-  }
-
-  /**
    * Add an entry to the dnsToUuidMap, which maps hostname / IP to the DNs
    * running on that host. As each address can have many DNs running on it,
    * this is a one to many mapping.
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 0319845..e26d521 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -296,7 +296,8 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
     CommandStatusReportHandler cmdStatusReportHandler =
         new CommandStatusReportHandler();
 
-    NewNodeHandler newNodeHandler = new NewNodeHandler(pipelineManager, conf);
+    NewNodeHandler newNodeHandler = new NewNodeHandler(pipelineManager,
+        scmDecommissionManager, conf);
     StaleNodeHandler staleNodeHandler =
         new StaleNodeHandler(scmNodeManager, pipelineManager, conf);
     DeadNodeHandler deadNodeHandler = new DeadNodeHandler(scmNodeManager,
@@ -339,8 +340,6 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
     blockProtocolServer = new SCMBlockProtocolServer(conf, this);
     clientProtocolServer = new SCMClientProtocolServer(conf, this);
     httpServer = new StorageContainerManagerHttpServer(conf);
-    scmDecommissionManager = new NodeDecommissionManager(conf, scmNodeManager,
-        containerManager, eventQueue, replicationManager);
     eventQueue.addHandler(SCMEvents.DATANODE_COMMAND, scmNodeManager);
     eventQueue.addHandler(SCMEvents.RETRIABLE_DATANODE_COMMAND, scmNodeManager);
     eventQueue.addHandler(SCMEvents.NODE_REPORT, nodeReportHandler);
@@ -456,6 +455,8 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
       scmSafeModeManager = new SCMSafeModeManager(conf,
           containerManager.getContainers(), pipelineManager, eventQueue);
     }
+    scmDecommissionManager = new NodeDecommissionManager(conf, scmNodeManager,
+        containerManager, eventQueue, replicationManager);
   }
 
   /**
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestDataUtil.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestDataUtil.java
index 115336e..4cdefb8 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestDataUtil.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestDataUtil.java
@@ -73,9 +73,16 @@ public final class TestDataUtil {
 
   public static void createKey(OzoneBucket bucket, String keyName,
       String content) throws IOException {
+    createKey(bucket, keyName, ReplicationFactor.ONE,
+        ReplicationType.STAND_ALONE, content);
+  }
+
+  public static void createKey(OzoneBucket bucket, String keyName,
+      ReplicationFactor repFactor, ReplicationType repType, String content)
+      throws IOException {
     try (OutputStream stream = bucket
-        .createKey(keyName, content.length(), ReplicationType.STAND_ALONE,
-            ReplicationFactor.ONE, new HashMap<>())) {
+        .createKey(keyName, content.length(), repType, repFactor,
+            new HashMap<>())) {
       stream.write(content.getBytes());
     }
   }
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDecommissionAndMaintenance.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDecommissionAndMaintenance.java
index f39af44..8f02878 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDecommissionAndMaintenance.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDecommissionAndMaintenance.java
@@ -17,28 +17,59 @@
  */
 package org.apache.hadoop.ozone.scm.node;
 
+import org.apache.hadoop.hdds.client.ReplicationFactor;
+import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.cli.ContainerOperationClient;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.container.ReplicationManager.ReplicationManagerConfiguration;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.node.NodeStatus;
+import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.TestDataUtil;
+import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.After;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.time.Duration;
 import java.util.Arrays;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static junit.framework.TestCase.assertEquals;
-import static org.apache.hadoop.hdds.HddsConfigKeys.*;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.*;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_NODE_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONED;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONING;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.DEAD;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.HEALTHY;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_ADMIN_MONITOR_INTERVAL;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
+import static org.junit.Assert.fail;
 
 /**
  * Test from the scmclient for decommission and maintenance.
@@ -48,8 +79,15 @@ public class TestDecommissionAndMaintenance {
   private static final Logger LOG =
       LoggerFactory.getLogger(TestDecommissionAndMaintenance.class);
 
-  private static int numOfDatanodes = 5;
+  private static int numOfDatanodes = 6;
+  private static String bucketName = "bucket1";
+  private static String volName = "vol1";
+  private OzoneBucket bucket;
   private MiniOzoneCluster cluster;
+  private NodeManager nm;
+  private ContainerManager cm;
+  private PipelineManager pm;
+  private StorageContainerManager scm;
 
   private ContainerOperationClient scmClient;
 
@@ -67,70 +105,315 @@ public class TestDecommissionAndMaintenance {
     conf.setTimeDuration(HDDS_NODE_REPORT_INTERVAL, 1, SECONDS);
     conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, SECONDS);
     conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 6, SECONDS);
+    conf.setTimeDuration(OZONE_SCM_DATANODE_ADMIN_MONITOR_INTERVAL,
+        1, SECONDS);
+
+    ReplicationManagerConfiguration replicationConf =
+        conf.getObject(ReplicationManagerConfiguration.class);
+    replicationConf.setInterval(Duration.ofSeconds(1));
+    conf.setFromObject(replicationConf);
 
     cluster = MiniOzoneCluster.newBuilder(conf)
         .setNumDatanodes(numOfDatanodes)
         .build();
     cluster.waitForClusterToBeReady();
+    setManagers();
+
+    bucket = TestDataUtil.createVolumeAndBucket(cluster, volName, bucketName);
     scmClient = new ContainerOperationClient(conf);
   }
 
   @After
-  public void tearDown() throws Exception {
+  public void tearDown() {
     if (cluster != null) {
       cluster.shutdown();
     }
   }
 
   @Test
-  @Ignore // HDDS-3345
-  public void testNodeCanBeDecommMaintAndRecommissioned()
-      throws IOException {
-    NodeManager nm = cluster.getStorageContainerManager().getScmNodeManager();
+  // Decommissioning a node with open pipelines should close the pipelines
+  // and hence the open containers and then the containers should be replicated
+  // by the replication manager.
+  public void testNodeWithOpenPipelineCanBeDecommissioned()
+      throws Exception {
+    // Generate some data on the empty cluster to create some containers
+    generateData(20, "key", ReplicationFactor.THREE, ReplicationType.RATIS);
+    // Locate any container and find its open pipeline
+    final ContainerInfo container =
+        scmClient.listContainer(0, 1).get(0);
+    Pipeline pipeline = pm.getPipeline(container.getPipelineID());
+    assertEquals(Pipeline.PipelineState.OPEN, pipeline.getPipelineState());
+
+    // Ensure all 3 replicas of the container have been reported via ICR
+    GenericTestUtils.waitFor(
+        () -> getContainerReplicas(container).size() == 3,
+        200, 30000);
+    Set<ContainerReplica> replicas = getContainerReplicas(container);
 
-    List<DatanodeDetails> dns = nm.getAllNodes();
-    scmClient.decommissionNodes(Arrays.asList(getDNHostAndPort(dns.get(0))));
+    final DatanodeDetails toDecommission = getOneDNHostingReplica(replicas);
+    scmClient.decommissionNodes(Arrays.asList(
+        getDNHostAndPort(toDecommission)));
 
-    // Ensure one node is decommissioning
+    // Ensure one node transitioned to DECOMMISSIONING
     List<DatanodeDetails> decomNodes = nm.getNodes(
-        HddsProtos.NodeOperationalState.DECOMMISSIONING,
-        HddsProtos.NodeState.HEALTHY);
+        DECOMMISSIONING,
+        HEALTHY);
     assertEquals(1, decomNodes.size());
+    waitForDnToReachOpState(toDecommission, DECOMMISSIONED);
 
-    scmClient.recommissionNodes(Arrays.asList(getDNHostAndPort(dns.get(0))));
+    // Should now be 4 replicas online as the DN is still alive but
+    // in the DECOMMISSIONED state.
+    Set<ContainerReplica> newReplicas =
+        cm.getContainerReplicas(container.containerID());
+    assertEquals(4, newReplicas.size());
 
-    // Ensure zero nodes are now decommissioning
-    decomNodes = nm.getNodes(
-        HddsProtos.NodeOperationalState.DECOMMISSIONING,
-        HddsProtos.NodeState.HEALTHY);
-    assertEquals(0, decomNodes.size());
+    // Stop the decommissioned DN
+    cluster.shutdownHddsDatanode(toDecommission);
+    waitForDnToReachHealthState(toDecommission, DEAD);
 
-    scmClient.startMaintenanceNodes(Arrays.asList(
-        getDNHostAndPort(dns.get(0))), 10);
+    // Now the decommissioned node is dead, we should have
+    // 3 replicas for the tracked container.
+    newReplicas = cm.getContainerReplicas(container.containerID());
+    assertEquals(3, newReplicas.size());
+  }
 
-    // None are decommissioning
-    decomNodes = nm.getNodes(
-        HddsProtos.NodeOperationalState.DECOMMISSIONING,
-        HddsProtos.NodeState.HEALTHY);
-    assertEquals(0, decomNodes.size());
+  @Test
+  // After a SCM restart, it will have forgotten all the Operational states.
+  // However the state will have been persisted on the DNs. Therefore on initial
+  // registration, the DN operationalState is the source of truth and SCM should
+  // be updated to reflect that.
+  public void testDecommissionedStateReinstatedAfterSCMRestart()
+      throws Exception {
+    // Decommission any node and wait for it to be DECOMMISSIONED
+    generateData(20, "key", ReplicationFactor.THREE, ReplicationType.RATIS);
+    DatanodeDetails dn = nm.getAllNodes().get(0);
+    scmClient.decommissionNodes(Arrays.asList(getDNHostAndPort(dn)));
+    waitForDnToReachOpState(dn, DECOMMISSIONED);
 
-    // One is in Maintenance
-    decomNodes = nm.getNodes(
-        HddsProtos.NodeOperationalState.ENTERING_MAINTENANCE,
-        HddsProtos.NodeState.HEALTHY);
-    assertEquals(1, decomNodes.size());
+    cluster.restartStorageContainerManager(true);
+    setManagers();
+    DatanodeDetails newDn = nm.getNodeByUuid(dn.getUuid().toString());
 
-    scmClient.recommissionNodes(Arrays.asList(getDNHostAndPort(dns.get(0))));
+    // On initial registration, the DN should report its operational state
+    // and if it is decommissioned, that should be updated in the NodeStatus
+    waitForDnToReachOpState(newDn, DECOMMISSIONED);
+    // Also confirm the datanodeDetails correctly reflect the operational
+    // state.
+    waitForDnToReachPersistedOpState(newDn, DECOMMISSIONED);
+  }
+
+  @Test
+  // If a node has not yet completed decommission and SCM is restarted, then
+  // when it re-registers it should re-enter the decommission workflow and
+  // complete decommissioning.
+  public void testDecommissioningNodesCompleteDecommissionOnSCMRestart()
+      throws Exception {
+    // First stop the replicationManager so nodes marked for decommission cannot
+    // make any progress. THe node will be stuck DECOMMISSIONING
+    scm.getReplicationManager().stop();
+    // Generate some data and then pick a DN to decommission which is hosting a
+    // container. This ensures it will not decommission immediately due to
+    // having no containers.
+    generateData(20, "key", ReplicationFactor.THREE, ReplicationType.RATIS);
+    final ContainerInfo container =
+        scmClient.listContainer(0, 1).get(0);
+    GenericTestUtils.waitFor(
+        () -> getContainerReplicas(container).size() == 3,
+        200, 30000);
+    final DatanodeDetails dn
+        = getOneDNHostingReplica(getContainerReplicas(container));
+    scmClient.decommissionNodes(Arrays.asList(getDNHostAndPort(dn)));
 
-    // None are in maintenance
-    decomNodes = nm.getNodes(
-        HddsProtos.NodeOperationalState.ENTERING_MAINTENANCE,
-        HddsProtos.NodeState.HEALTHY);
-    assertEquals(0, decomNodes.size());
+    // Wait for the state to be persisted on the DN so it can report it on
+    // restart of SCM.
+    waitForDnToReachPersistedOpState(dn, DECOMMISSIONING);
+    cluster.restartStorageContainerManager(true);
+    setManagers();
+
+    // After the SCM restart, the DN should report as DECOMMISSIONING, then
+    // it should re-enter the decommission workflow and move to DECOMMISSIONED
+    DatanodeDetails newDn = nm.getNodeByUuid(dn.getUuid().toString());
+    waitForDnToReachOpState(newDn, DECOMMISSIONED);
+    waitForDnToReachPersistedOpState(newDn, DECOMMISSIONED);
   }
 
+  @Test
+  // If a node was decommissioned, and then stopped so it is dead. Then it is
+  // recommissioned in SCM and restarted, the SCM state should be taken as the
+  // source of truth and the node will go to the IN_SERVICE state and the state
+  // should be updated on the DN.
+  public void testStoppedDecommissionedNodeTakesSCMStateOnRestart()
+      throws Exception {
+    // Decommission node and wait for it to be DECOMMISSIONED
+    generateData(20, "key", ReplicationFactor.THREE, ReplicationType.RATIS);
+    DatanodeDetails dn = nm.getAllNodes().get(0);
+    scmClient.decommissionNodes(Arrays.asList(getDNHostAndPort(dn)));
+    waitForDnToReachOpState(dn, DECOMMISSIONED);
+    waitForDnToReachPersistedOpState(dn, DECOMMISSIONED);
+
+    int dnIndex = cluster.getHddsDatanodeIndex(dn);
+    cluster.shutdownHddsDatanode(dnIndex);
+    waitForDnToReachHealthState(dn, DEAD);
+
+    // Datanode is shutdown and dead. Now recommission it in SCM
+    scmClient.recommissionNodes(Arrays.asList(getDNHostAndPort(dn)));
+
+    // Now restart it and ensure it remains IN_SERVICE
+    cluster.restartHddsDatanode(dnIndex, true);
+    DatanodeDetails newDn = nm.getNodeByUuid(dn.getUuid().toString());
+
+    // As this is not an initial registration since SCM was started, the DN
+    // should report its operational state and if it differs from what SCM
+    // has, then the SCM state should be used and the DN state updated.
+    waitForDnToReachHealthState(newDn, HEALTHY);
+    waitForDnToReachOpState(newDn, IN_SERVICE);
+    waitForDnToReachPersistedOpState(dn, IN_SERVICE);
+  }
+
+  @Test
+  // A node which is decommissioning or decommissioned can be move back to
+  // IN_SERVICE.
+  public void testDecommissionedNodeCanBeRecommissioned() throws Exception {
+    generateData(20, "key", ReplicationFactor.THREE, ReplicationType.RATIS);
+    DatanodeDetails dn = nm.getAllNodes().get(0);
+    scmClient.decommissionNodes(Arrays.asList(getDNHostAndPort(dn)));
+
+    GenericTestUtils.waitFor(
+        () -> !dn.getPersistedOpState()
+            .equals(IN_SERVICE),
+        200, 30000);
+
+    scmClient.recommissionNodes(Arrays.asList(getDNHostAndPort(dn)));
+    waitForDnToReachOpState(dn, IN_SERVICE);
+    waitForDnToReachPersistedOpState(dn, IN_SERVICE);
+  }
+
+  /**
+   * Sets the instance variables to the values for the current MiniCluster.
+   */
+  private void setManagers() {
+    scm = cluster.getStorageContainerManager();
+    nm = scm.getScmNodeManager();
+    cm = scm.getContainerManager();
+    pm = scm.getPipelineManager();
+  }
+
+  /**
+   * Generates some data on the cluster so the cluster has some containers.
+   * @param keyCount The number of keys to create
+   * @param keyPrefix The prefix to use for the key name.
+   * @param repFactor The replication Factor for the keys
+   * @param repType The replication Type for the keys
+   * @throws IOException
+   */
+  private void generateData(int keyCount, String keyPrefix,
+      ReplicationFactor repFactor, ReplicationType repType) throws IOException {
+    for (int i=0; i<keyCount; i++) {
+      TestDataUtil.createKey(bucket, keyPrefix + i, repFactor, repType,
+          "this is the content");
+    }
+  }
+
+  /**
+   * Retrieves the NodeStatus for the given DN or fails the test if the
+   * Node cannot be found. This is a helper method to allow the nodeStatus to be
+   * checked in lambda expressions.
+   * @param dn Datanode for which to retrieve the NodeStatus.
+   * @return
+   */
+  private NodeStatus getNodeStatus(DatanodeDetails dn) {
+    NodeStatus status = null;
+    try {
+      status = nm.getNodeStatus(dn);
+    } catch (NodeNotFoundException e) {
+      fail("Unexpected exception getting the nodeState");
+    }
+    return status;
+  }
+
+  /**
+   * Retrieves the containerReplica set for a given container or fails the test
+   * if the container cannot be found. This is a helper method to allow the
+   * container replica count to be checked in a lambda expression.
+   * @param c The container for which to retrieve replicas
+   * @return
+   */
+  private Set<ContainerReplica> getContainerReplicas(ContainerInfo c) {
+    Set<ContainerReplica> replicas = null;
+    try {
+      replicas = cm.getContainerReplicas(c.containerID());
+    } catch (ContainerNotFoundException e) {
+      fail("Unexpected ContainerNotFoundException");
+    }
+    return replicas;
+  }
+
+  /**
+   * Select any DN hosting a replica from the Replica Set.
+   * @param replicas The set of ContainerReplica
+   * @return Any datanode associated one of the replicas
+   */
+  private DatanodeDetails getOneDNHostingReplica(
+      Set<ContainerReplica> replicas) {
+    // Now Decommission a host with one of the replicas
+    Iterator<ContainerReplica> iter = replicas.iterator();
+    ContainerReplica c = iter.next();
+    return c.getDatanodeDetails();
+  }
+
+  /**
+   * Given a Datanode, return a string consisting of the hostname and one of its
+   * ports in the for host:post.
+   * @param dn Datanode for which to retrieve the host:post string
+   * @return host:port for the given DN.
+   */
   private String getDNHostAndPort(DatanodeDetails dn) {
     return dn.getHostName()+":"+dn.getPorts().get(0).getValue();
   }
 
+  /**
+   * Wait for the given datanode to reach the given operational state.
+   * @param dn Datanode for which to check the state
+   * @param state The state to wait for.
+   * @throws TimeoutException
+   * @throws InterruptedException
+   */
+  private void waitForDnToReachOpState(DatanodeDetails dn,
+      HddsProtos.NodeOperationalState state)
+      throws TimeoutException, InterruptedException {
+    GenericTestUtils.waitFor(
+        () -> getNodeStatus(dn).getOperationalState().equals(state),
+        200, 30000);
+  }
+
+  /**
+   * Wait for the given datanode to reach the given Health state.
+   * @param dn Datanode for which to check the state
+   * @param state The state to wait for.
+   * @throws TimeoutException
+   * @throws InterruptedException
+   */
+  private void waitForDnToReachHealthState(DatanodeDetails dn,
+      HddsProtos.NodeState state)
+      throws TimeoutException, InterruptedException {
+    GenericTestUtils.waitFor(
+        () -> getNodeStatus(dn).getHealth().equals(state),
+        200, 30000);
+  }
+
+  /**
+   * Wait for the given datanode to reach the given persisted state.
+   * @param dn Datanode for which to check the state
+   * @param state The state to wait for.
+   * @throws TimeoutException
+   * @throws InterruptedException
+   */
+  private void waitForDnToReachPersistedOpState(DatanodeDetails dn,
+      HddsProtos.NodeOperationalState state)
+      throws TimeoutException, InterruptedException {
+    GenericTestUtils.waitFor(
+        () -> dn.getPersistedOpState().equals(state),
+        200, 30000);
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: ozone-commits-help@hadoop.apache.org