You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by ja...@apache.org on 2022/05/31 12:44:54 UTC
[ozone] branch master updated: HDDS-6804. use common backgroundScmService for backgroundPipelieScruber (#3459)
This is an automated email from the ASF dual-hosted git repository.
jacksonyao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new f51a0d543f HDDS-6804. use common backgroundScmService for backgroundPipelieScruber (#3459)
f51a0d543f is described below
commit f51a0d543f1bdc13829ffa60b62cb61f2256e1c8
Author: Jie Yao <ja...@tencent.com>
AuthorDate: Tue May 31 20:44:49 2022 +0800
HDDS-6804. use common backgroundScmService for backgroundPipelieScruber (#3459)
---
.../scm/pipeline/BackgroundPipelineScrubber.java | 183 ---------------------
.../hdds/scm/pipeline/PipelineManagerImpl.java | 32 +++-
.../pipeline/TestBackgroundPipelineScrubber.java | 109 ------------
3 files changed, 26 insertions(+), 298 deletions(-)
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineScrubber.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineScrubber.java
deleted file mode 100644
index 3dc5f4177f..0000000000
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineScrubber.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdds.scm.pipeline;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.hdds.HddsConfigKeys;
-import org.apache.hadoop.hdds.conf.ConfigurationSource;
-import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-import org.apache.hadoop.hdds.scm.ha.SCMContext;
-import org.apache.hadoop.hdds.scm.ha.SCMService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.time.Clock;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-/**
- * Background service to clean up pipelines with following conditions.
- * - CLOSED
- * - ALLOCATED for too long
- */
-public class BackgroundPipelineScrubber implements SCMService {
-
- private static final Logger LOG =
- LoggerFactory.getLogger(BackgroundPipelineScrubber.class);
-
- private static final String THREAD_NAME = "PipelineScrubberThread";
-
- private final PipelineManager pipelineManager;
- private final ConfigurationSource conf;
- private final SCMContext scmContext;
-
- private final Lock serviceLock = new ReentrantLock();
- private ServiceStatus serviceStatus = ServiceStatus.PAUSING;
-
- private final AtomicBoolean running = new AtomicBoolean(false);
- private Thread scrubThread;
- private final long intervalInMillis;
- private final long waitTimeInMillis;
- private long lastTimeToBeReadyInMillis = 0;
- private volatile boolean runImmediately = false;
- private final Clock clock;
-
- public BackgroundPipelineScrubber(PipelineManager pipelineManager,
- ConfigurationSource conf, SCMContext scmContext, Clock clock) {
- this.pipelineManager = pipelineManager;
- this.conf = conf;
- this.scmContext = scmContext;
- this.clock = clock;
-
- this.intervalInMillis = conf.getTimeDuration(
- ScmConfigKeys.OZONE_SCM_PIPELINE_SCRUB_INTERVAL,
- ScmConfigKeys.OZONE_SCM_PIPELINE_SCRUB_INTERVAL_DEFAULT,
- TimeUnit.MILLISECONDS);
- this.waitTimeInMillis = conf.getTimeDuration(
- HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT,
- HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT_DEFAULT,
- TimeUnit.MILLISECONDS);
-
- start();
- }
-
- @Override
- public void notifyStatusChanged() {
- serviceLock.lock();
- try {
- if (scmContext.isLeaderReady() && !scmContext.isInSafeMode()) {
- if (serviceStatus != ServiceStatus.RUNNING) {
- LOG.info("Service {} transitions to RUNNING.", getServiceName());
- serviceStatus = ServiceStatus.RUNNING;
- lastTimeToBeReadyInMillis = clock.millis();
- }
- } else {
- if (serviceStatus != ServiceStatus.PAUSING) {
- LOG.info("Service {} transitions to PAUSING.", getServiceName());
- serviceStatus = ServiceStatus.PAUSING;
- }
- }
- } finally {
- serviceLock.unlock();
- }
- }
-
- @Override
- public boolean shouldRun() {
- serviceLock.lock();
- try {
- // If safe mode is off, then this SCMService starts to run with a delay.
- return serviceStatus == ServiceStatus.RUNNING &&
- clock.millis() - lastTimeToBeReadyInMillis >= waitTimeInMillis;
- } finally {
- serviceLock.unlock();
- }
- }
-
- @Override
- public String getServiceName() {
- return BackgroundPipelineScrubber.class.getSimpleName();
- }
-
- @Override
- public void start() {
- if (!running.compareAndSet(false, true)) {
- LOG.info("{} Service is already running, skip start.", getServiceName());
- return;
- }
- LOG.info("Starting {} Service.", getServiceName());
-
- scrubThread = new Thread(this::run);
- scrubThread.setName(THREAD_NAME);
- scrubThread.setDaemon(true);
- scrubThread.start();
- }
-
- @Override
- public void stop() {
- synchronized (this) {
- if (!running.compareAndSet(true, false)) {
- LOG.info("{} Service is not running, skip stop.", getServiceName());
- return;
- }
- scrubThread.interrupt();
- }
- LOG.info("Stopping {} Service.", getServiceName());
- }
-
- @VisibleForTesting
- public boolean getRunning() {
- return running.get();
- }
-
- private void run() {
- while (running.get()) {
- try {
- if (shouldRun()) {
- scrubAllPipelines();
- }
- synchronized (this) {
- if (!runImmediately) {
- wait(intervalInMillis);
- }
- runImmediately = false;
- }
- } catch (InterruptedException e) {
- LOG.warn("{} is interrupted, exit", THREAD_NAME);
- Thread.currentThread().interrupt();
- running.set(false);
- }
- }
- }
-
- public synchronized void runImmediately() {
- runImmediately = true;
- notify();
- }
-
- private void scrubAllPipelines() {
- try {
- pipelineManager.scrubPipelines();
- } catch (IOException e) {
- LOG.error("Unexpected error during pipeline scrubbing", e);
- }
- }
-}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
index 1cc4b45261..4cb96b1d2f 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.scm.ha.BackgroundSCMService;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
@@ -76,7 +77,7 @@ public class PipelineManagerImpl implements PipelineManager {
private PipelineFactory pipelineFactory;
private PipelineStateManager stateManager;
private BackgroundPipelineCreator backgroundPipelineCreator;
- private BackgroundPipelineScrubber backgroundPipelineScrubber;
+ private BackgroundSCMService backgroundPipelineScrubber;
private final ConfigurationSource conf;
private final EventPublisher eventPublisher;
// Pipeline Manager MXBean
@@ -151,9 +152,28 @@ public class PipelineManagerImpl implements PipelineManager {
BackgroundPipelineCreator backgroundPipelineCreator =
new BackgroundPipelineCreator(pipelineManager, conf, scmContext, clock);
- BackgroundPipelineScrubber backgroundPipelineScrubber =
- new BackgroundPipelineScrubber(pipelineManager, conf, scmContext,
- clock);
+ final long scrubberIntervalInMillis = conf.getTimeDuration(
+ ScmConfigKeys.OZONE_SCM_PIPELINE_SCRUB_INTERVAL,
+ ScmConfigKeys.OZONE_SCM_PIPELINE_SCRUB_INTERVAL_DEFAULT,
+ TimeUnit.MILLISECONDS);
+ final long safeModeWaitMs = conf.getTimeDuration(
+ HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT,
+ HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT_DEFAULT,
+ TimeUnit.MILLISECONDS);
+
+ BackgroundSCMService backgroundPipelineScrubber =
+ new BackgroundSCMService.Builder().setClock(clock)
+ .setScmContext(scmContext)
+ .setServiceName("BackgroundPipelineScrubber")
+ .setIntervalInMillis(scrubberIntervalInMillis)
+ .setWaitTimeInMillis(safeModeWaitMs)
+ .setPeriodicalTask(() -> {
+ try {
+ pipelineManager.scrubPipelines();
+ } catch (IOException e) {
+ LOG.error("Unexpected error during pipeline scrubbing", e);
+ }
+ }).build();
pipelineManager.setBackgroundPipelineCreator(backgroundPipelineCreator);
pipelineManager.setBackgroundPipelineScrubber(backgroundPipelineScrubber);
@@ -656,12 +676,12 @@ public class PipelineManagerImpl implements PipelineManager {
}
private void setBackgroundPipelineScrubber(
- BackgroundPipelineScrubber backgroundPipelineScrubber) {
+ BackgroundSCMService backgroundPipelineScrubber) {
this.backgroundPipelineScrubber = backgroundPipelineScrubber;
}
@VisibleForTesting
- public BackgroundPipelineScrubber getBackgroundPipelineScrubber() {
+ public BackgroundSCMService getBackgroundPipelineScrubber() {
return this.backgroundPipelineScrubber;
}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestBackgroundPipelineScrubber.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestBackgroundPipelineScrubber.java
deleted file mode 100644
index d60cb2294e..0000000000
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestBackgroundPipelineScrubber.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdds.scm.pipeline;
-
-import org.apache.hadoop.hdds.HddsConfigKeys;
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.scm.ha.SCMContext;
-import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager.SafeModeStatus;
-import org.apache.ozone.test.TestClock;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-import java.io.IOException;
-import java.time.Instant;
-import java.time.ZoneOffset;
-
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.timeout;
-import static org.mockito.Mockito.verify;
-
-/**
- * Test for {@link BackgroundPipelineScrubber}.
- */
-public class TestBackgroundPipelineScrubber {
-
- private BackgroundPipelineScrubber scrubber;
- private SCMContext scmContext;
- private PipelineManager pipelineManager;
- private OzoneConfiguration conf;
- private TestClock testClock;
-
- @BeforeEach
- public void setup() throws IOException {
- testClock = new TestClock(Instant.now(), ZoneOffset.UTC);
- this.scmContext = SCMContext.emptyContext();
- this.pipelineManager = mock(PipelineManager.class);
- doNothing().when(pipelineManager).scrubPipelines();
-
- this.conf = new OzoneConfiguration();
- conf.set(HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT, "50s");
-
- this.scrubber = new BackgroundPipelineScrubber(pipelineManager, conf,
- scmContext, testClock);
- }
-
- @AfterEach
- public void teardown() throws IOException {
- scrubber.stop();
- }
-
- @Test
- public void testStop() {
- assertTrue(scrubber.getRunning());
- scrubber.stop();
- assertFalse(scrubber.getRunning());
- }
-
- @Test
- public void testNotifyStatusChanged() {
- // init at PAUSING
- assertFalse(scrubber.shouldRun());
-
- // out of safe mode, PAUSING -> RUNNING
- scrubber.notifyStatusChanged();
- // Still cannot run, as the safemode delay has not passed.
- assertFalse(scrubber.shouldRun());
-
- testClock.fastForward(60000);
- assertTrue(scrubber.shouldRun());
-
- // go into safe mode, RUNNING -> PAUSING
- scmContext.updateSafeModeStatus(new SafeModeStatus(true, true));
- scrubber.notifyStatusChanged();
- assertFalse(scrubber.shouldRun());
- }
-
- @Test
- public void testRun() throws IOException {
- assertFalse(scrubber.shouldRun());
- // kick a run
- synchronized (scrubber) {
- scrubber.notifyStatusChanged();
- assertFalse(scrubber.shouldRun());
- testClock.fastForward(60000);
- assertTrue(scrubber.shouldRun());
- scrubber.runImmediately();
- }
- verify(pipelineManager, timeout(3000).atLeastOnce()).scrubPipelines();
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org