You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by so...@apache.org on 2022/04/27 10:36:01 UTC
[ozone] branch master updated: HDDS-6598. Add a BackgroundPipelineScrubber to scrub all pipelines. (#3337)
This is an automated email from the ASF dual-hosted git repository.
sodonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new d9a623ad72 HDDS-6598. Add a BackgroundPipelineScrubber to scrub all pipelines. (#3337)
d9a623ad72 is described below
commit d9a623ad724b05939960d53814152313413b4f82
Author: Gui Hecheng <ma...@tencent.com>
AuthorDate: Wed Apr 27 18:35:56 2022 +0800
HDDS-6598. Add a BackgroundPipelineScrubber to scrub all pipelines. (#3337)
---
.../org/apache/hadoop/hdds/scm/ScmConfigKeys.java | 6 +
.../common/src/main/resources/ozone-default.xml | 9 ++
.../scm/pipeline/BackgroundPipelineCreator.java | 15 +-
.../scm/pipeline/BackgroundPipelineScrubber.java | 172 +++++++++++++++++++++
.../hadoop/hdds/scm/pipeline/PipelineManager.java | 3 +-
.../hdds/scm/pipeline/PipelineManagerImpl.java | 30 +++-
.../hdds/scm/pipeline/MockPipelineManager.java | 3 +-
.../pipeline/TestBackgroundPipelineScrubber.java | 99 ++++++++++++
.../hdds/scm/pipeline/TestPipelineManagerImpl.java | 39 +++--
.../hdds/scm/safemode/TestSCMSafeModeManager.java | 1 +
10 files changed, 341 insertions(+), 36 deletions(-)
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
index 2a962d715f..fc83469d89 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
@@ -403,6 +403,12 @@ public final class ScmConfigKeys {
public static final String OZONE_SCM_PIPELINE_CREATION_INTERVAL_DEFAULT =
"120s";
+ public static final String OZONE_SCM_PIPELINE_SCRUB_INTERVAL =
+ "ozone.scm.pipeline.scrub.interval";
+ public static final String OZONE_SCM_PIPELINE_SCRUB_INTERVAL_DEFAULT =
+ "5m";
+
+
// Allow SCM to auto create factor ONE ratis pipeline.
public static final String OZONE_SCM_PIPELINE_AUTO_CREATE_FACTOR_ONE =
"ozone.scm.pipeline.creation.auto.factor.one";
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 436d5ab39a..d67358650a 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -1368,6 +1368,15 @@
create pipelines.
</description>
</property>
+ <property>
+ <name>ozone.scm.pipeline.scrub.interval</name>
+ <value>5m</value>
+ <tag>OZONE, SCM, PIPELINE</tag>
+ <description>
+ SCM schedules a fixed interval job using the configured interval to
+ scrub pipelines.
+ </description>
+ </property>
<property>
<name>ozone.scm.pipeline.creation.auto.factor.one</name>
<value>true</value>
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
index 6fed1e2fa4..c9eb68346f 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
@@ -29,7 +29,6 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.ha.SCMService;
-import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
@@ -88,9 +87,7 @@ public class BackgroundPipelineCreator implements SCMService {
BackgroundPipelineCreator(PipelineManager pipelineManager,
- ConfigurationSource conf,
- SCMServiceManager serviceManager,
- SCMContext scmContext) {
+ ConfigurationSource conf, SCMContext scmContext) {
this.pipelineManager = pipelineManager;
this.conf = conf;
this.scmContext = scmContext;
@@ -109,9 +106,6 @@ public class BackgroundPipelineCreator implements SCMService {
ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_INTERVAL_DEFAULT,
TimeUnit.MILLISECONDS);
- // register BackgroundPipelineCreator to SCMServiceManager
- serviceManager.register(this);
-
// start RatisPipelineUtilsThread
start();
}
@@ -223,13 +217,6 @@ public class BackgroundPipelineCreator implements SCMService {
continue;
}
list.add(replicationConfig);
- if (!pipelineManager.getSafeModeStatus()) {
- try {
- pipelineManager.scrubPipeline(replicationConfig);
- } catch (IOException e) {
- LOG.error("Error while scrubbing pipelines.", e);
- }
- }
}
LoopingIterator it = new LoopingIterator(list);
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineScrubber.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineScrubber.java
new file mode 100644
index 0000000000..2063ac37d6
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineScrubber.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.scm.ha.SCMService;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Background service to clean up pipelines with following conditions.
+ * - CLOSED
+ * - ALLOCATED for too long
+ */
+public class BackgroundPipelineScrubber implements SCMService {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(BackgroundPipelineScrubber.class);
+
+ private static final String THREAD_NAME = "PipelineScrubberThread";
+
+ private final PipelineManager pipelineManager;
+ private final ConfigurationSource conf;
+ private final SCMContext scmContext;
+
+ private final Lock serviceLock = new ReentrantLock();
+ private ServiceStatus serviceStatus = ServiceStatus.PAUSING;
+
+ private final AtomicBoolean running = new AtomicBoolean(false);
+ private Thread scrubThread;
+ private final long intervalInMillis;
+ private final long waitTimeInMillis;
+ private long lastTimeToBeReadyInMillis = 0;
+
+ public BackgroundPipelineScrubber(PipelineManager pipelineManager,
+ ConfigurationSource conf, SCMContext scmContext) {
+ this.pipelineManager = pipelineManager;
+ this.conf = conf;
+ this.scmContext = scmContext;
+
+ this.intervalInMillis = conf.getTimeDuration(
+ ScmConfigKeys.OZONE_SCM_PIPELINE_SCRUB_INTERVAL,
+ ScmConfigKeys.OZONE_SCM_PIPELINE_SCRUB_INTERVAL_DEFAULT,
+ TimeUnit.MILLISECONDS);
+ this.waitTimeInMillis = conf.getTimeDuration(
+ HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT,
+ HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT_DEFAULT,
+ TimeUnit.MILLISECONDS);
+
+ start();
+ }
+
+ @Override
+ public void notifyStatusChanged() {
+ serviceLock.lock();
+ try {
+ if (scmContext.isLeaderReady() && !scmContext.isInSafeMode()) {
+ if (serviceStatus != ServiceStatus.RUNNING) {
+ LOG.info("Service {} transitions to RUNNING.", getServiceName());
+ serviceStatus = ServiceStatus.RUNNING;
+ lastTimeToBeReadyInMillis = Time.monotonicNow();
+ }
+ } else {
+ if (serviceStatus != ServiceStatus.PAUSING) {
+ LOG.info("Service {} transitions to PAUSING.", getServiceName());
+ serviceStatus = ServiceStatus.PAUSING;
+ }
+ }
+ } finally {
+ serviceLock.unlock();
+ }
+ }
+
+ @Override
+ public boolean shouldRun() {
+ serviceLock.lock();
+ try {
+ // If safe mode is off, then this SCMService starts to run with a delay.
+ return serviceStatus == ServiceStatus.RUNNING &&
+ Time.monotonicNow() - lastTimeToBeReadyInMillis >= waitTimeInMillis;
+ } finally {
+ serviceLock.unlock();
+ }
+ }
+
+ @Override
+ public String getServiceName() {
+ return BackgroundPipelineScrubber.class.getSimpleName();
+ }
+
+ @Override
+ public void start() {
+ if (!running.compareAndSet(false, true)) {
+ LOG.info("Pipeline Scrubber Service is already running, skip start.");
+ return;
+ }
+ LOG.info("Starting Pipeline Scrubber Service.");
+
+ scrubThread = new Thread(this::run);
+ scrubThread.setName(THREAD_NAME);
+ scrubThread.setDaemon(true);
+ scrubThread.start();
+ }
+
+ @Override
+ public void stop() {
+ synchronized (this) {
+ if (!running.compareAndSet(true, false)) {
+ LOG.info("Pipeline Scrubber Service is not running, skip stop.");
+ return;
+ }
+ notifyAll();
+ }
+ LOG.info("Stopping Pipeline Scrubber Service.");
+ }
+
+ @VisibleForTesting
+ public boolean getRunning() {
+ return running.get();
+ }
+
+ private void run() {
+ while (running.get()) {
+ try {
+ if (shouldRun()) {
+ scrubAllPipelines();
+ }
+ synchronized (this) {
+ wait(intervalInMillis);
+ }
+ } catch (InterruptedException e) {
+ LOG.warn("{} is interrupted, exit", THREAD_NAME);
+ Thread.currentThread().interrupt();
+ running.set(false);
+ }
+ }
+ }
+
+ private void scrubAllPipelines() {
+ try {
+ pipelineManager.scrubPipelines();
+ } catch (IOException e) {
+ LOG.error("Unexpected error during pipeline scrubbing", e);
+ }
+ }
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
index 6a50876919..ffce3146f5 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
@@ -111,8 +111,7 @@ public interface PipelineManager extends Closeable, PipelineManagerMXBean {
void closePipeline(Pipeline pipeline, boolean onTimeout) throws IOException;
- void scrubPipeline(ReplicationConfig replicationConfig)
- throws IOException;
+ void scrubPipelines() throws IOException;
void startPipelineCreator();
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
index 19b7a51d6c..83b037cb2f 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
@@ -76,6 +76,7 @@ public class PipelineManagerImpl implements PipelineManager {
private PipelineFactory pipelineFactory;
private PipelineStateManager stateManager;
private BackgroundPipelineCreator backgroundPipelineCreator;
+ private BackgroundPipelineScrubber backgroundPipelineScrubber;
private final ConfigurationSource conf;
private final EventPublisher eventPublisher;
// Pipeline Manager MXBean
@@ -141,10 +142,16 @@ public class PipelineManagerImpl implements PipelineManager {
// Create background thread.
BackgroundPipelineCreator backgroundPipelineCreator =
- new BackgroundPipelineCreator(
- pipelineManager, conf, serviceManager, scmContext);
+ new BackgroundPipelineCreator(pipelineManager, conf, scmContext);
+
+ BackgroundPipelineScrubber backgroundPipelineScrubber =
+ new BackgroundPipelineScrubber(pipelineManager, conf, scmContext);
pipelineManager.setBackgroundPipelineCreator(backgroundPipelineCreator);
+ pipelineManager.setBackgroundPipelineScrubber(backgroundPipelineScrubber);
+
+ serviceManager.register(backgroundPipelineCreator);
+ serviceManager.register(backgroundPipelineScrubber);
return pipelineManager;
}
@@ -408,15 +415,14 @@ public class PipelineManagerImpl implements PipelineManager {
* Scrub pipelines.
*/
@Override
- public void scrubPipeline(ReplicationConfig config)
- throws IOException {
+ public void scrubPipelines() throws IOException {
Instant currentTime = Instant.now();
Long pipelineScrubTimeoutInMills = conf.getTimeDuration(
ScmConfigKeys.OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT,
ScmConfigKeys.OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT_DEFAULT,
TimeUnit.MILLISECONDS);
- List<Pipeline> candidates = stateManager.getPipelines(config);
+ List<Pipeline> candidates = stateManager.getPipelines();
for (Pipeline p : candidates) {
// scrub pipelines who stay ALLOCATED for too long.
@@ -437,7 +443,6 @@ public class PipelineManagerImpl implements PipelineManager {
removePipeline(p);
}
}
- return;
}
/**
@@ -591,6 +596,9 @@ public class PipelineManagerImpl implements PipelineManager {
if (backgroundPipelineCreator != null) {
backgroundPipelineCreator.stop();
}
+ if (backgroundPipelineScrubber != null) {
+ backgroundPipelineScrubber.stop();
+ }
if (pmInfoBean != null) {
MBeans.unregister(this.pmInfoBean);
@@ -639,6 +647,16 @@ public class PipelineManagerImpl implements PipelineManager {
return this.backgroundPipelineCreator;
}
+ private void setBackgroundPipelineScrubber(
+ BackgroundPipelineScrubber backgroundPipelineScrubber) {
+ this.backgroundPipelineScrubber = backgroundPipelineScrubber;
+ }
+
+ @VisibleForTesting
+ public BackgroundPipelineScrubber getBackgroundPipelineScrubber() {
+ return this.backgroundPipelineScrubber;
+ }
+
@VisibleForTesting
public PipelineFactory getPipelineFactory() {
return pipelineFactory;
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
index 523efc0735..932d2d5387 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
@@ -218,8 +218,7 @@ public class MockPipelineManager implements PipelineManager {
}
@Override
- public void scrubPipeline(ReplicationConfig replicationConfig)
- throws IOException {
+ public void scrubPipelines() {
}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestBackgroundPipelineScrubber.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestBackgroundPipelineScrubber.java
new file mode 100644
index 0000000000..dfa528454e
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestBackgroundPipelineScrubber.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager.SafeModeStatus;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Test for {@link BackgroundPipelineScrubber}.
+ */
+public class TestBackgroundPipelineScrubber {
+
+ private BackgroundPipelineScrubber scrubber;
+ private SCMContext scmContext;
+ private PipelineManager pipelineManager;
+ private OzoneConfiguration conf;
+
+ @Before
+ public void setup() throws IOException {
+ this.scmContext = SCMContext.emptyContext();
+ this.pipelineManager = mock(PipelineManager.class);
+ doNothing().when(pipelineManager).scrubPipelines();
+
+ // no initial delay after exit safe mode
+ this.conf = new OzoneConfiguration();
+ conf.set(HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT, "0ms");
+
+ this.scrubber = new BackgroundPipelineScrubber(pipelineManager, conf,
+ scmContext);
+ }
+
+ @After
+ public void teardown() throws IOException {
+ scrubber.stop();
+ }
+
+ @Test
+ public void testStop() {
+ assertTrue(scrubber.getRunning());
+ scrubber.stop();
+ assertFalse(scrubber.getRunning());
+ }
+
+ @Test
+ public void testNotifyStatusChanged() {
+ // init at PAUSING
+ assertFalse(scrubber.shouldRun());
+
+ // out of safe mode, PAUSING -> RUNNING
+ scrubber.notifyStatusChanged();
+ assertTrue(scrubber.shouldRun());
+
+ // go into safe mode, RUNNING -> PAUSING
+ scmContext.updateSafeModeStatus(new SafeModeStatus(true, true));
+ scrubber.notifyStatusChanged();
+ assertFalse(scrubber.shouldRun());
+ }
+
+ @Test
+ public void testRun() throws IOException {
+ assertFalse(scrubber.shouldRun());
+ // kick a run
+ synchronized (scrubber) {
+ scrubber.notifyStatusChanged();
+ assertTrue(scrubber.shouldRun());
+ scrubber.notifyAll();
+ }
+ verify(pipelineManager, timeout(3000).times(1)).scrubPipelines();
+ }
+}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
index fd29929c13..64ef02eb37 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
@@ -535,7 +535,7 @@ public class TestPipelineManagerImpl {
}
@Test
- public void testScrubPipeline() throws Exception {
+ public void testScrubPipelines() throws Exception {
// No timeout for pipeline scrubber.
conf.setTimeDuration(
OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT, -1,
@@ -543,33 +543,50 @@ public class TestPipelineManagerImpl {
PipelineManagerImpl pipelineManager = createPipelineManager(true);
pipelineManager.setScmContext(scmContext);
- Pipeline pipeline = pipelineManager
+ Pipeline allocatedPipeline = pipelineManager
.createPipeline(RatisReplicationConfig
.getInstance(ReplicationFactor.THREE));
// At this point, pipeline is not at OPEN stage.
Assert.assertEquals(Pipeline.PipelineState.ALLOCATED,
- pipeline.getPipelineState());
+ allocatedPipeline.getPipelineState());
// pipeline should be seen in pipelineManager as ALLOCATED.
Assert.assertTrue(pipelineManager
.getPipelines(RatisReplicationConfig
.getInstance(ReplicationFactor.THREE),
- Pipeline.PipelineState.ALLOCATED).contains(pipeline));
- pipelineManager
- .scrubPipeline(RatisReplicationConfig
+ Pipeline.PipelineState.ALLOCATED).contains(allocatedPipeline));
+
+ Pipeline closedPipeline = pipelineManager
+ .createPipeline(RatisReplicationConfig
.getInstance(ReplicationFactor.THREE));
+ pipelineManager.openPipeline(closedPipeline.getId());
+ pipelineManager.closePipeline(closedPipeline, true);
+
+ // pipeline should be seen in pipelineManager as CLOSED.
+ Assert.assertTrue(pipelineManager
+ .getPipelines(RatisReplicationConfig
+ .getInstance(ReplicationFactor.THREE),
+ Pipeline.PipelineState.CLOSED).contains(closedPipeline));
- // pipeline should be scrubbed.
+ pipelineManager.scrubPipelines();
+
+ // The allocatedPipeline should be scrubbed.
Assert.assertFalse(pipelineManager
.getPipelines(RatisReplicationConfig
.getInstance(ReplicationFactor.THREE),
- Pipeline.PipelineState.ALLOCATED).contains(pipeline));
+ Pipeline.PipelineState.ALLOCATED).contains(allocatedPipeline));
+
+ // The closedPipeline should be scrubbed.
+ Assert.assertFalse(pipelineManager
+ .getPipelines(RatisReplicationConfig
+ .getInstance(ReplicationFactor.THREE),
+ Pipeline.PipelineState.CLOSED).contains(closedPipeline));
pipelineManager.close();
}
@Test
- public void testScrubPipelineShouldFailOnFollower() throws Exception {
+ public void testScrubPipelinesShouldFailOnFollower() throws Exception {
// No timeout for pipeline scrubber.
conf.setTimeDuration(
OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT, -1,
@@ -595,9 +612,7 @@ public class TestPipelineManagerImpl {
((SCMHAManagerStub) pipelineManager.getScmhaManager()).setIsLeader(false);
try {
- pipelineManager
- .scrubPipeline(RatisReplicationConfig
- .getInstance(ReplicationFactor.THREE));
+ pipelineManager.scrubPipelines();
} catch (NotLeaderException ex) {
pipelineManager.close();
return;
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java
index 1062274651..823c2ddf9f 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java
@@ -354,6 +354,7 @@ public class TestSCMSafeModeManager {
pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
mockRatisProvider);
pipelineManager.getBackgroundPipelineCreator().stop();
+ pipelineManager.getBackgroundPipelineScrubber().stop();
for (int i = 0; i < pipelineCount; i++) {
// Create pipeline
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org