You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by so...@apache.org on 2022/04/27 10:36:01 UTC

[ozone] branch master updated: HDDS-6598. Add a BackgroundPipelineScrubber to scrub all pipelines. (#3337)

This is an automated email from the ASF dual-hosted git repository.

sodonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new d9a623ad72 HDDS-6598. Add a BackgroundPipelineScrubber to scrub all pipelines. (#3337)
d9a623ad72 is described below

commit d9a623ad724b05939960d53814152313413b4f82
Author: Gui Hecheng <ma...@tencent.com>
AuthorDate: Wed Apr 27 18:35:56 2022 +0800

    HDDS-6598. Add a BackgroundPipelineScrubber to scrub all pipelines. (#3337)
---
 .../org/apache/hadoop/hdds/scm/ScmConfigKeys.java  |   6 +
 .../common/src/main/resources/ozone-default.xml    |   9 ++
 .../scm/pipeline/BackgroundPipelineCreator.java    |  15 +-
 .../scm/pipeline/BackgroundPipelineScrubber.java   | 172 +++++++++++++++++++++
 .../hadoop/hdds/scm/pipeline/PipelineManager.java  |   3 +-
 .../hdds/scm/pipeline/PipelineManagerImpl.java     |  30 +++-
 .../hdds/scm/pipeline/MockPipelineManager.java     |   3 +-
 .../pipeline/TestBackgroundPipelineScrubber.java   |  99 ++++++++++++
 .../hdds/scm/pipeline/TestPipelineManagerImpl.java |  39 +++--
 .../hdds/scm/safemode/TestSCMSafeModeManager.java  |   1 +
 10 files changed, 341 insertions(+), 36 deletions(-)

diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
index 2a962d715f..fc83469d89 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
@@ -403,6 +403,12 @@ public final class ScmConfigKeys {
   public static final String OZONE_SCM_PIPELINE_CREATION_INTERVAL_DEFAULT =
       "120s";
 
+  public static final String OZONE_SCM_PIPELINE_SCRUB_INTERVAL =
+      "ozone.scm.pipeline.scrub.interval";
+  public static final String OZONE_SCM_PIPELINE_SCRUB_INTERVAL_DEFAULT =
+      "5m";
+
+
   // Allow SCM to auto create factor ONE ratis pipeline.
   public static final String OZONE_SCM_PIPELINE_AUTO_CREATE_FACTOR_ONE =
       "ozone.scm.pipeline.creation.auto.factor.one";
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 436d5ab39a..d67358650a 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -1368,6 +1368,15 @@
       create pipelines.
     </description>
   </property>
+  <property>
+    <name>ozone.scm.pipeline.scrub.interval</name>
+    <value>5m</value>
+    <tag>OZONE, SCM, PIPELINE</tag>
+    <description>
+      SCM schedules a fixed interval job using the configured interval to
+      scrub pipelines.
+    </description>
+  </property>
   <property>
     <name>ozone.scm.pipeline.creation.auto.factor.one</name>
     <value>true</value>
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
index 6fed1e2fa4..c9eb68346f 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
@@ -29,7 +29,6 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.ha.SCMContext;
 import org.apache.hadoop.hdds.scm.ha.SCMService;
-import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.util.Time;
 import org.slf4j.Logger;
@@ -88,9 +87,7 @@ public class BackgroundPipelineCreator implements SCMService {
 
 
   BackgroundPipelineCreator(PipelineManager pipelineManager,
-                              ConfigurationSource conf,
-                              SCMServiceManager serviceManager,
-                              SCMContext scmContext) {
+      ConfigurationSource conf, SCMContext scmContext) {
     this.pipelineManager = pipelineManager;
     this.conf = conf;
     this.scmContext = scmContext;
@@ -109,9 +106,6 @@ public class BackgroundPipelineCreator implements SCMService {
         ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_INTERVAL_DEFAULT,
         TimeUnit.MILLISECONDS);
 
-    // register BackgroundPipelineCreator to SCMServiceManager
-    serviceManager.register(this);
-
     // start RatisPipelineUtilsThread
     start();
   }
@@ -223,13 +217,6 @@ public class BackgroundPipelineCreator implements SCMService {
         continue;
       }
       list.add(replicationConfig);
-      if (!pipelineManager.getSafeModeStatus()) {
-        try {
-          pipelineManager.scrubPipeline(replicationConfig);
-        } catch (IOException e) {
-          LOG.error("Error while scrubbing pipelines.", e);
-        }
-      }
     }
 
     LoopingIterator it = new LoopingIterator(list);
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineScrubber.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineScrubber.java
new file mode 100644
index 0000000000..2063ac37d6
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineScrubber.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.scm.ha.SCMService;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Background service to clean up pipelines with following conditions.
+ * - CLOSED
+ * - ALLOCATED for too long
+ */
+public class BackgroundPipelineScrubber implements SCMService {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(BackgroundPipelineScrubber.class);
+
+  private static final String THREAD_NAME = "PipelineScrubberThread";
+
+  private final PipelineManager pipelineManager;
+  private final ConfigurationSource conf;
+  private final SCMContext scmContext;
+
+  private final Lock serviceLock = new ReentrantLock();
+  private ServiceStatus serviceStatus = ServiceStatus.PAUSING;
+
+  private final AtomicBoolean running = new AtomicBoolean(false);
+  private Thread scrubThread;
+  private final long intervalInMillis;
+  private final long waitTimeInMillis;
+  private long lastTimeToBeReadyInMillis = 0;
+
+  public BackgroundPipelineScrubber(PipelineManager pipelineManager,
+      ConfigurationSource conf, SCMContext scmContext) {
+    this.pipelineManager = pipelineManager;
+    this.conf = conf;
+    this.scmContext = scmContext;
+
+    this.intervalInMillis = conf.getTimeDuration(
+        ScmConfigKeys.OZONE_SCM_PIPELINE_SCRUB_INTERVAL,
+        ScmConfigKeys.OZONE_SCM_PIPELINE_SCRUB_INTERVAL_DEFAULT,
+        TimeUnit.MILLISECONDS);
+    this.waitTimeInMillis = conf.getTimeDuration(
+        HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT,
+        HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT_DEFAULT,
+        TimeUnit.MILLISECONDS);
+
+    start();
+  }
+
+  @Override
+  public void notifyStatusChanged() {
+    serviceLock.lock();
+    try {
+      if (scmContext.isLeaderReady() && !scmContext.isInSafeMode()) {
+        if (serviceStatus != ServiceStatus.RUNNING) {
+          LOG.info("Service {} transitions to RUNNING.", getServiceName());
+          serviceStatus = ServiceStatus.RUNNING;
+          lastTimeToBeReadyInMillis = Time.monotonicNow();
+        }
+      } else {
+        if (serviceStatus != ServiceStatus.PAUSING) {
+          LOG.info("Service {} transitions to PAUSING.", getServiceName());
+          serviceStatus = ServiceStatus.PAUSING;
+        }
+      }
+    } finally {
+      serviceLock.unlock();
+    }
+  }
+
+  @Override
+  public boolean shouldRun() {
+    serviceLock.lock();
+    try {
+      // If safe mode is off, then this SCMService starts to run with a delay.
+      return serviceStatus == ServiceStatus.RUNNING &&
+          Time.monotonicNow() - lastTimeToBeReadyInMillis >= waitTimeInMillis;
+    } finally {
+      serviceLock.unlock();
+    }
+  }
+
+  @Override
+  public String getServiceName() {
+    return BackgroundPipelineScrubber.class.getSimpleName();
+  }
+
+  @Override
+  public void start() {
+    if (!running.compareAndSet(false, true)) {
+      LOG.info("Pipeline Scrubber Service is already running, skip start.");
+      return;
+    }
+    LOG.info("Starting Pipeline Scrubber Service.");
+
+    scrubThread = new Thread(this::run);
+    scrubThread.setName(THREAD_NAME);
+    scrubThread.setDaemon(true);
+    scrubThread.start();
+  }
+
+  @Override
+  public void stop() {
+    synchronized (this) {
+      if (!running.compareAndSet(true, false)) {
+        LOG.info("Pipeline Scrubber Service is not running, skip stop.");
+        return;
+      }
+      notifyAll();
+    }
+    LOG.info("Stopping Pipeline Scrubber Service.");
+  }
+
+  @VisibleForTesting
+  public boolean getRunning() {
+    return running.get();
+  }
+
+  private void run() {
+    while (running.get()) {
+      try {
+        if (shouldRun()) {
+          scrubAllPipelines();
+        }
+        synchronized (this) {
+          wait(intervalInMillis);
+        }
+      } catch (InterruptedException e) {
+        LOG.warn("{} is interrupted, exit", THREAD_NAME);
+        Thread.currentThread().interrupt();
+        running.set(false);
+      }
+    }
+  }
+
+  private void scrubAllPipelines() {
+    try {
+      pipelineManager.scrubPipelines();
+    } catch (IOException e) {
+      LOG.error("Unexpected error during pipeline scrubbing", e);
+    }
+  }
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
index 6a50876919..ffce3146f5 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
@@ -111,8 +111,7 @@ public interface PipelineManager extends Closeable, PipelineManagerMXBean {
 
   void closePipeline(Pipeline pipeline, boolean onTimeout) throws IOException;
 
-  void scrubPipeline(ReplicationConfig replicationConfig)
-      throws IOException;
+  void scrubPipelines() throws IOException;
 
   void startPipelineCreator();
 
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
index 19b7a51d6c..83b037cb2f 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
@@ -76,6 +76,7 @@ public class PipelineManagerImpl implements PipelineManager {
   private PipelineFactory pipelineFactory;
   private PipelineStateManager stateManager;
   private BackgroundPipelineCreator backgroundPipelineCreator;
+  private BackgroundPipelineScrubber backgroundPipelineScrubber;
   private final ConfigurationSource conf;
   private final EventPublisher eventPublisher;
   // Pipeline Manager MXBean
@@ -141,10 +142,16 @@ public class PipelineManagerImpl implements PipelineManager {
 
     // Create background thread.
     BackgroundPipelineCreator backgroundPipelineCreator =
-        new BackgroundPipelineCreator(
-            pipelineManager, conf, serviceManager, scmContext);
+        new BackgroundPipelineCreator(pipelineManager, conf, scmContext);
+
+    BackgroundPipelineScrubber backgroundPipelineScrubber =
+        new BackgroundPipelineScrubber(pipelineManager, conf, scmContext);
 
     pipelineManager.setBackgroundPipelineCreator(backgroundPipelineCreator);
+    pipelineManager.setBackgroundPipelineScrubber(backgroundPipelineScrubber);
+
+    serviceManager.register(backgroundPipelineCreator);
+    serviceManager.register(backgroundPipelineScrubber);
 
     return pipelineManager;
   }
@@ -408,15 +415,14 @@ public class PipelineManagerImpl implements PipelineManager {
    * Scrub pipelines.
    */
   @Override
-  public void scrubPipeline(ReplicationConfig config)
-      throws IOException {
+  public void scrubPipelines() throws IOException {
     Instant currentTime = Instant.now();
     Long pipelineScrubTimeoutInMills = conf.getTimeDuration(
         ScmConfigKeys.OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT,
         ScmConfigKeys.OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT_DEFAULT,
         TimeUnit.MILLISECONDS);
 
-    List<Pipeline> candidates = stateManager.getPipelines(config);
+    List<Pipeline> candidates = stateManager.getPipelines();
 
     for (Pipeline p : candidates) {
       // scrub pipelines who stay ALLOCATED for too long.
@@ -437,7 +443,6 @@ public class PipelineManagerImpl implements PipelineManager {
         removePipeline(p);
       }
     }
-    return;
   }
 
   /**
@@ -591,6 +596,9 @@ public class PipelineManagerImpl implements PipelineManager {
     if (backgroundPipelineCreator != null) {
       backgroundPipelineCreator.stop();
     }
+    if (backgroundPipelineScrubber != null) {
+      backgroundPipelineScrubber.stop();
+    }
 
     if (pmInfoBean != null) {
       MBeans.unregister(this.pmInfoBean);
@@ -639,6 +647,16 @@ public class PipelineManagerImpl implements PipelineManager {
     return this.backgroundPipelineCreator;
   }
 
+  private void setBackgroundPipelineScrubber(
+      BackgroundPipelineScrubber backgroundPipelineScrubber) {
+    this.backgroundPipelineScrubber = backgroundPipelineScrubber;
+  }
+
+  @VisibleForTesting
+  public BackgroundPipelineScrubber getBackgroundPipelineScrubber() {
+    return this.backgroundPipelineScrubber;
+  }
+
   @VisibleForTesting
   public PipelineFactory getPipelineFactory() {
     return pipelineFactory;
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
index 523efc0735..932d2d5387 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
@@ -218,8 +218,7 @@ public class MockPipelineManager implements PipelineManager {
   }
 
   @Override
-  public void scrubPipeline(ReplicationConfig replicationConfig)
-      throws IOException {
+  public void scrubPipelines() {
 
   }
 
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestBackgroundPipelineScrubber.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestBackgroundPipelineScrubber.java
new file mode 100644
index 0000000000..dfa528454e
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestBackgroundPipelineScrubber.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager.SafeModeStatus;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Test for {@link BackgroundPipelineScrubber}.
+ */
+public class TestBackgroundPipelineScrubber {
+
+  private BackgroundPipelineScrubber scrubber;
+  private SCMContext scmContext;
+  private PipelineManager pipelineManager;
+  private OzoneConfiguration conf;
+
+  @Before
+  public void setup() throws IOException {
+    this.scmContext = SCMContext.emptyContext();
+    this.pipelineManager = mock(PipelineManager.class);
+    doNothing().when(pipelineManager).scrubPipelines();
+
+    // no initial delay after exit safe mode
+    this.conf = new OzoneConfiguration();
+    conf.set(HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT, "0ms");
+
+    this.scrubber = new BackgroundPipelineScrubber(pipelineManager, conf,
+        scmContext);
+  }
+
+  @After
+  public void teardown() throws IOException {
+    scrubber.stop();
+  }
+
+  @Test
+  public void testStop() {
+    assertTrue(scrubber.getRunning());
+    scrubber.stop();
+    assertFalse(scrubber.getRunning());
+  }
+
+  @Test
+  public void testNotifyStatusChanged() {
+    // init at PAUSING
+    assertFalse(scrubber.shouldRun());
+
+    // out of safe mode, PAUSING -> RUNNING
+    scrubber.notifyStatusChanged();
+    assertTrue(scrubber.shouldRun());
+
+    // go into safe mode, RUNNING -> PAUSING
+    scmContext.updateSafeModeStatus(new SafeModeStatus(true, true));
+    scrubber.notifyStatusChanged();
+    assertFalse(scrubber.shouldRun());
+  }
+
+  @Test
+  public void testRun() throws IOException {
+    assertFalse(scrubber.shouldRun());
+    // kick a run
+    synchronized (scrubber) {
+      scrubber.notifyStatusChanged();
+      assertTrue(scrubber.shouldRun());
+      scrubber.notifyAll();
+    }
+    verify(pipelineManager, timeout(3000).times(1)).scrubPipelines();
+  }
+}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
index fd29929c13..64ef02eb37 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
@@ -535,7 +535,7 @@ public class TestPipelineManagerImpl {
   }
 
   @Test
-  public void testScrubPipeline() throws Exception {
+  public void testScrubPipelines() throws Exception {
     // No timeout for pipeline scrubber.
     conf.setTimeDuration(
         OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT, -1,
@@ -543,33 +543,50 @@ public class TestPipelineManagerImpl {
 
     PipelineManagerImpl pipelineManager = createPipelineManager(true);
     pipelineManager.setScmContext(scmContext);
-    Pipeline pipeline = pipelineManager
+    Pipeline allocatedPipeline = pipelineManager
         .createPipeline(RatisReplicationConfig
             .getInstance(ReplicationFactor.THREE));
     // At this point, pipeline is not at OPEN stage.
     Assert.assertEquals(Pipeline.PipelineState.ALLOCATED,
-        pipeline.getPipelineState());
+        allocatedPipeline.getPipelineState());
 
     // pipeline should be seen in pipelineManager as ALLOCATED.
     Assert.assertTrue(pipelineManager
         .getPipelines(RatisReplicationConfig
             .getInstance(ReplicationFactor.THREE),
-            Pipeline.PipelineState.ALLOCATED).contains(pipeline));
-    pipelineManager
-        .scrubPipeline(RatisReplicationConfig
+            Pipeline.PipelineState.ALLOCATED).contains(allocatedPipeline));
+
+    Pipeline closedPipeline = pipelineManager
+        .createPipeline(RatisReplicationConfig
             .getInstance(ReplicationFactor.THREE));
+    pipelineManager.openPipeline(closedPipeline.getId());
+    pipelineManager.closePipeline(closedPipeline, true);
+
+    // pipeline should be seen in pipelineManager as CLOSED.
+    Assert.assertTrue(pipelineManager
+        .getPipelines(RatisReplicationConfig
+                .getInstance(ReplicationFactor.THREE),
+            Pipeline.PipelineState.CLOSED).contains(closedPipeline));
 
-    // pipeline should be scrubbed.
+    pipelineManager.scrubPipelines();
+
+    // The allocatedPipeline should be scrubbed.
     Assert.assertFalse(pipelineManager
         .getPipelines(RatisReplicationConfig
             .getInstance(ReplicationFactor.THREE),
-            Pipeline.PipelineState.ALLOCATED).contains(pipeline));
+            Pipeline.PipelineState.ALLOCATED).contains(allocatedPipeline));
+
+    // The closedPipeline should be scrubbed.
+    Assert.assertFalse(pipelineManager
+        .getPipelines(RatisReplicationConfig
+                .getInstance(ReplicationFactor.THREE),
+            Pipeline.PipelineState.CLOSED).contains(closedPipeline));
 
     pipelineManager.close();
   }
 
   @Test
-  public void testScrubPipelineShouldFailOnFollower() throws Exception {
+  public void testScrubPipelinesShouldFailOnFollower() throws Exception {
     // No timeout for pipeline scrubber.
     conf.setTimeDuration(
         OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT, -1,
@@ -595,9 +612,7 @@ public class TestPipelineManagerImpl {
     ((SCMHAManagerStub) pipelineManager.getScmhaManager()).setIsLeader(false);
 
     try {
-      pipelineManager
-          .scrubPipeline(RatisReplicationConfig
-              .getInstance(ReplicationFactor.THREE));
+      pipelineManager.scrubPipelines();
     } catch (NotLeaderException ex) {
       pipelineManager.close();
       return;
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java
index 1062274651..823c2ddf9f 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java
@@ -354,6 +354,7 @@ public class TestSCMSafeModeManager {
     pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
         mockRatisProvider);
     pipelineManager.getBackgroundPipelineCreator().stop();
+    pipelineManager.getBackgroundPipelineScrubber().stop();
 
     for (int i = 0; i < pipelineCount; i++) {
       // Create pipeline


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org