You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by ca...@apache.org on 2022/10/20 13:09:30 UTC

[ozone] 02/04: HDDS-7341. EC: Close pipelines with unregistered nodes (#3850)

This is an automated email from the ASF dual-hosted git repository.

captainzmc pushed a commit to branch ozone-1.3
in repository https://gitbox.apache.org/repos/asf/ozone.git

commit 36490993db7fd123fe29e63848da6d4185aa7cbf
Author: Stephen O'Donnell <st...@gmail.com>
AuthorDate: Wed Oct 19 09:53:30 2022 +0100

    HDDS-7341. EC: Close pipelines with unregistered nodes (#3850)
---
 .../hdds/scm/pipeline/PipelineManagerImpl.java     | 40 ++++++++++++++++++---
 .../hdds/scm/pipeline/TestPipelineManagerImpl.java | 41 +++++++++++++++++-----
 2 files changed, 68 insertions(+), 13 deletions(-)

diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
index aaa3088f9a..044cbc0166 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
@@ -509,22 +509,52 @@ public class PipelineManagerImpl implements PipelineManager {
       if (p.getPipelineState() == Pipeline.PipelineState.ALLOCATED &&
           (currentTime.toEpochMilli() - p.getCreationTimestamp()
               .toEpochMilli() >= pipelineScrubTimeoutInMills)) {
-        LOG.info("Scrubbing pipeline: id: " + p.getId().toString() +
-            " since it stays at ALLOCATED stage for " +
+        LOG.info("Scrubbing pipeline: id: {} since it stays at ALLOCATED " +
+            "stage for {} mins.", p.getId(),
             Duration.between(currentTime, p.getCreationTimestamp())
-                .toMinutes() + " mins.");
+                .toMinutes());
         closePipeline(p, false);
       }
       // scrub pipelines who stay CLOSED for too long.
       if (p.getPipelineState() == Pipeline.PipelineState.CLOSED) {
-        LOG.info("Scrubbing pipeline: id: " + p.getId().toString() +
-            " since it stays at CLOSED stage.");
+        LOG.info("Scrubbing pipeline: id: {} since it stays at CLOSED stage.",
+            p.getId());
         closeContainersForPipeline(p.getId());
         removePipeline(p);
       }
+      // If a datanode is stopped and then SCM is restarted, a pipeline can get
+      // stuck in an open state. For Ratis, provided some other DNs that were
+      // part of the open pipeline register to SCM after the restart, the Ratis
+      // pipeline close will get triggered by the DNs. For EC that will never
+      // happen, as the DNs are not aware of the pipeline. Therefore we should
+      // close any pipelines in the scrubber if they have nodes which are not
+      // registered
+      if (isOpenWithUnregisteredNodes(p)) {
+        LOG.info("Scrubbing pipeline: id: {} as it has unregistered nodes",
+            p.getId());
+        closeContainersForPipeline(p.getId());
+        closePipeline(p, true);
+      }
     }
   }
 
+  /**
+   * @param pipeline The pipeline to check
+   * @return True if the pipeline is open and contains unregistered nodes. False
+   *         otherwise.
+   */
+  private boolean isOpenWithUnregisteredNodes(Pipeline pipeline) {
+    if (!pipeline.isOpen()) {
+      return false;
+    }
+    for (DatanodeDetails dn : pipeline.getNodes()) {
+      if (nodeManager.getNodeByUuid(dn.getUuidString()) == null) {
+        return true;
+      }
+    }
+    return false;
+  }
+
   /**
    * Schedules a fixed interval job to create pipelines.
    */
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
index 8fa45f5842..1ed9b845ac 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdds.scm.pipeline;
 import com.google.common.base.Supplier;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
 import org.apache.hadoop.hdds.client.RatisReplicationConfig;
 import org.apache.hadoop.hdds.client.ReplicationConfig;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -29,12 +30,14 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
 import org.apache.hadoop.hdds.scm.HddsTestUtils;
 import org.apache.hadoop.hdds.scm.PipelineChoosePolicy;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerManager;
 import org.apache.hadoop.hdds.scm.container.ContainerReplica;
 import org.apache.hadoop.hdds.scm.container.MockNodeManager;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
+import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementRandom;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.ha.SCMHADBTransactionBuffer;
 import org.apache.hadoop.hdds.scm.ha.SCMHADBTransactionBufferStub;
@@ -125,6 +128,10 @@ public class TestPipelineManagerImpl {
         GenericTestUtils.getRandomizedTempPath());
     scm = HddsTestUtils.getScm(conf);
     conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
+    // Mock Node Manager is not able to correctly set up things for the EC
+    // placement policy (Rack Scatter), so just use the random one.
+    conf.set(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_EC_IMPL_KEY,
+        SCMContainerPlacementRandom.class.getName());
     dbStore = DBStoreBuilder.createDBStore(conf, new SCMDBDefinition());
     nodeManager = new MockNodeManager(true, 20);
     maxPipelineCount = nodeManager.getNodeCount(
@@ -151,7 +158,7 @@ public class TestPipelineManagerImpl {
       throws IOException {
     return PipelineManagerImpl.newPipelineManager(conf,
         SCMHAManagerStub.getInstance(isLeader),
-        new MockNodeManager(true, 20),
+        nodeManager,
         SCMDBDefinition.PIPELINES.getTable(dbStore),
         new EventQueue(),
         scmContext,
@@ -163,7 +170,7 @@ public class TestPipelineManagerImpl {
       boolean isLeader, SCMHADBTransactionBuffer buffer) throws IOException {
     return PipelineManagerImpl.newPipelineManager(conf,
         SCMHAManagerStub.getInstance(isLeader, buffer),
-        new MockNodeManager(true, 20),
+        nodeManager,
         SCMDBDefinition.PIPELINES.getTable(dbStore),
         new EventQueue(),
         SCMContext.emptyContext(),
@@ -341,7 +348,6 @@ public class TestPipelineManagerImpl {
   @Test
   public void testRemovePipeline() throws Exception {
     PipelineManagerImpl pipelineManager = createPipelineManager(true);
-    pipelineManager.setScmContext(scmContext);
     // Create a pipeline
     Pipeline pipeline = pipelineManager.createPipeline(
         RatisReplicationConfig.getInstance(ReplicationFactor.THREE));
@@ -391,7 +397,6 @@ public class TestPipelineManagerImpl {
   @Test
   public void testClosePipelineShouldFailOnFollower() throws Exception {
     PipelineManagerImpl pipelineManager = createPipelineManager(true);
-    pipelineManager.setScmContext(scmContext);
     Pipeline pipeline = pipelineManager.createPipeline(
         RatisReplicationConfig.getInstance(ReplicationFactor.THREE));
     Assertions.assertEquals(1, pipelineManager.getPipelines().size());
@@ -413,7 +418,6 @@ public class TestPipelineManagerImpl {
   @Test
   public void testPipelineReport() throws Exception {
     PipelineManagerImpl pipelineManager = createPipelineManager(true);
-    pipelineManager.setScmContext(scmContext);
     SCMSafeModeManager scmSafeModeManager =
         new SCMSafeModeManager(conf, new ArrayList<>(), null, pipelineManager,
             new EventQueue(), serviceManager, scmContext);
@@ -571,7 +575,6 @@ public class TestPipelineManagerImpl {
         OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT, 50, TimeUnit.SECONDS);
 
     PipelineManagerImpl pipelineManager = createPipelineManager(true);
-    pipelineManager.setScmContext(scmContext);
     Pipeline allocatedPipeline = pipelineManager
         .createPipeline(RatisReplicationConfig
             .getInstance(ReplicationFactor.THREE));
@@ -628,13 +631,36 @@ public class TestPipelineManagerImpl {
     pipelineManager.close();
   }
 
+  @Test
+  public void testScrubOpenWithUnregisteredNodes() throws Exception {
+    PipelineManagerImpl pipelineManager = createPipelineManager(true);
+    Pipeline pipeline = pipelineManager
+        .createPipeline(new ECReplicationConfig(3, 2));
+    pipelineManager.openPipeline(pipeline.getId());
+
+    // Scrubbing the pipelines should not affect this pipeline
+    pipelineManager.scrubPipelines();
+    pipeline = pipelineManager.getPipeline(pipeline.getId());
+    Assertions.assertEquals(Pipeline.PipelineState.OPEN,
+        pipeline.getPipelineState());
+
+    // Now, "unregister" one of the nodes in the pipeline
+    DatanodeDetails firstDN = nodeManager.getNodeByUuid(
+        pipeline.getNodes().get(0).getUuidString());
+    nodeManager.getClusterNetworkTopologyMap().remove(firstDN);
+
+    pipelineManager.scrubPipelines();
+    pipeline = pipelineManager.getPipeline(pipeline.getId());
+    Assertions.assertEquals(Pipeline.PipelineState.CLOSED,
+        pipeline.getPipelineState());
+  }
+
   @Test
   public void testScrubPipelinesShouldFailOnFollower() throws Exception {
     conf.setTimeDuration(
         OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT, 10, TimeUnit.SECONDS);
 
     PipelineManagerImpl pipelineManager = createPipelineManager(true);
-    pipelineManager.setScmContext(scmContext);
     Pipeline pipeline = pipelineManager
         .createPipeline(RatisReplicationConfig
             .getInstance(ReplicationFactor.THREE));
@@ -765,7 +791,6 @@ public class TestPipelineManagerImpl {
     GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer
             .captureLogs(LoggerFactory.getLogger(PipelineManagerImpl.class));
     PipelineManagerImpl pipelineManager = createPipelineManager(true);
-    pipelineManager.setScmContext(scmContext);
     Pipeline pipeline = pipelineManager.createPipeline(
             RatisReplicationConfig
                 .getInstance(HddsProtos.ReplicationFactor.THREE));


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org