You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by ja...@apache.org on 2022/05/31 12:44:54 UTC

[ozone] branch master updated: HDDS-6804. use common backgroundScmService for backgroundPipelieScruber (#3459)

This is an automated email from the ASF dual-hosted git repository.

jacksonyao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new f51a0d543f HDDS-6804. use common backgroundScmService for backgroundPipelieScruber (#3459)
f51a0d543f is described below

commit f51a0d543f1bdc13829ffa60b62cb61f2256e1c8
Author: Jie Yao <ja...@tencent.com>
AuthorDate: Tue May 31 20:44:49 2022 +0800

    HDDS-6804. use common backgroundScmService for backgroundPipelieScruber (#3459)
---
 .../scm/pipeline/BackgroundPipelineScrubber.java   | 183 ---------------------
 .../hdds/scm/pipeline/PipelineManagerImpl.java     |  32 +++-
 .../pipeline/TestBackgroundPipelineScrubber.java   | 109 ------------
 3 files changed, 26 insertions(+), 298 deletions(-)

diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineScrubber.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineScrubber.java
deleted file mode 100644
index 3dc5f4177f..0000000000
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineScrubber.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.hadoop.hdds.scm.pipeline;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.hdds.HddsConfigKeys;
-import org.apache.hadoop.hdds.conf.ConfigurationSource;
-import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-import org.apache.hadoop.hdds.scm.ha.SCMContext;
-import org.apache.hadoop.hdds.scm.ha.SCMService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.time.Clock;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-/**
- * Background service to clean up pipelines with following conditions.
- * - CLOSED
- * - ALLOCATED for too long
- */
-public class BackgroundPipelineScrubber implements SCMService {
-
-  private static final Logger LOG =
-      LoggerFactory.getLogger(BackgroundPipelineScrubber.class);
-
-  private static final String THREAD_NAME = "PipelineScrubberThread";
-
-  private final PipelineManager pipelineManager;
-  private final ConfigurationSource conf;
-  private final SCMContext scmContext;
-
-  private final Lock serviceLock = new ReentrantLock();
-  private ServiceStatus serviceStatus = ServiceStatus.PAUSING;
-
-  private final AtomicBoolean running = new AtomicBoolean(false);
-  private Thread scrubThread;
-  private final long intervalInMillis;
-  private final long waitTimeInMillis;
-  private long lastTimeToBeReadyInMillis = 0;
-  private volatile boolean runImmediately = false;
-  private final Clock clock;
-
-  public BackgroundPipelineScrubber(PipelineManager pipelineManager,
-      ConfigurationSource conf, SCMContext scmContext, Clock clock) {
-    this.pipelineManager = pipelineManager;
-    this.conf = conf;
-    this.scmContext = scmContext;
-    this.clock = clock;
-
-    this.intervalInMillis = conf.getTimeDuration(
-        ScmConfigKeys.OZONE_SCM_PIPELINE_SCRUB_INTERVAL,
-        ScmConfigKeys.OZONE_SCM_PIPELINE_SCRUB_INTERVAL_DEFAULT,
-        TimeUnit.MILLISECONDS);
-    this.waitTimeInMillis = conf.getTimeDuration(
-        HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT,
-        HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT_DEFAULT,
-        TimeUnit.MILLISECONDS);
-
-    start();
-  }
-
-  @Override
-  public void notifyStatusChanged() {
-    serviceLock.lock();
-    try {
-      if (scmContext.isLeaderReady() && !scmContext.isInSafeMode()) {
-        if (serviceStatus != ServiceStatus.RUNNING) {
-          LOG.info("Service {} transitions to RUNNING.", getServiceName());
-          serviceStatus = ServiceStatus.RUNNING;
-          lastTimeToBeReadyInMillis = clock.millis();
-        }
-      } else {
-        if (serviceStatus != ServiceStatus.PAUSING) {
-          LOG.info("Service {} transitions to PAUSING.", getServiceName());
-          serviceStatus = ServiceStatus.PAUSING;
-        }
-      }
-    } finally {
-      serviceLock.unlock();
-    }
-  }
-
-  @Override
-  public boolean shouldRun() {
-    serviceLock.lock();
-    try {
-      // If safe mode is off, then this SCMService starts to run with a delay.
-      return serviceStatus == ServiceStatus.RUNNING &&
-          clock.millis() - lastTimeToBeReadyInMillis >= waitTimeInMillis;
-    } finally {
-      serviceLock.unlock();
-    }
-  }
-
-  @Override
-  public String getServiceName() {
-    return BackgroundPipelineScrubber.class.getSimpleName();
-  }
-
-  @Override
-  public void start() {
-    if (!running.compareAndSet(false, true)) {
-      LOG.info("{} Service is already running, skip start.", getServiceName());
-      return;
-    }
-    LOG.info("Starting {} Service.", getServiceName());
-
-    scrubThread = new Thread(this::run);
-    scrubThread.setName(THREAD_NAME);
-    scrubThread.setDaemon(true);
-    scrubThread.start();
-  }
-
-  @Override
-  public void stop() {
-    synchronized (this) {
-      if (!running.compareAndSet(true, false)) {
-        LOG.info("{} Service is not running, skip stop.", getServiceName());
-        return;
-      }
-      scrubThread.interrupt();
-    }
-    LOG.info("Stopping {} Service.", getServiceName());
-  }
-
-  @VisibleForTesting
-  public boolean getRunning() {
-    return running.get();
-  }
-
-  private void run() {
-    while (running.get()) {
-      try {
-        if (shouldRun()) {
-          scrubAllPipelines();
-        }
-        synchronized (this) {
-          if (!runImmediately) {
-            wait(intervalInMillis);
-          }
-          runImmediately = false;
-        }
-      } catch (InterruptedException e) {
-        LOG.warn("{} is interrupted, exit", THREAD_NAME);
-        Thread.currentThread().interrupt();
-        running.set(false);
-      }
-    }
-  }
-
-  public synchronized void runImmediately() {
-    runImmediately = true;
-    notify();
-  }
-
-  private void scrubAllPipelines() {
-    try {
-      pipelineManager.scrubPipelines();
-    } catch (IOException e) {
-      LOG.error("Unexpected error during pipeline scrubbing", e);
-    }
-  }
-}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
index 1cc4b45261..4cb96b1d2f 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerManager;
 import org.apache.hadoop.hdds.scm.container.ContainerReplica;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.scm.ha.BackgroundSCMService;
 import org.apache.hadoop.hdds.scm.ha.SCMContext;
 import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
 import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
@@ -76,7 +77,7 @@ public class PipelineManagerImpl implements PipelineManager {
   private PipelineFactory pipelineFactory;
   private PipelineStateManager stateManager;
   private BackgroundPipelineCreator backgroundPipelineCreator;
-  private BackgroundPipelineScrubber backgroundPipelineScrubber;
+  private BackgroundSCMService backgroundPipelineScrubber;
   private final ConfigurationSource conf;
   private final EventPublisher eventPublisher;
   // Pipeline Manager MXBean
@@ -151,9 +152,28 @@ public class PipelineManagerImpl implements PipelineManager {
     BackgroundPipelineCreator backgroundPipelineCreator =
         new BackgroundPipelineCreator(pipelineManager, conf, scmContext, clock);
 
-    BackgroundPipelineScrubber backgroundPipelineScrubber =
-        new BackgroundPipelineScrubber(pipelineManager, conf, scmContext,
-            clock);
+    final long scrubberIntervalInMillis = conf.getTimeDuration(
+        ScmConfigKeys.OZONE_SCM_PIPELINE_SCRUB_INTERVAL,
+        ScmConfigKeys.OZONE_SCM_PIPELINE_SCRUB_INTERVAL_DEFAULT,
+        TimeUnit.MILLISECONDS);
+    final long safeModeWaitMs = conf.getTimeDuration(
+        HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT,
+        HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT_DEFAULT,
+        TimeUnit.MILLISECONDS);
+
+    BackgroundSCMService backgroundPipelineScrubber =
+        new BackgroundSCMService.Builder().setClock(clock)
+            .setScmContext(scmContext)
+            .setServiceName("BackgroundPipelineScrubber")
+            .setIntervalInMillis(scrubberIntervalInMillis)
+            .setWaitTimeInMillis(safeModeWaitMs)
+            .setPeriodicalTask(() -> {
+              try {
+                pipelineManager.scrubPipelines();
+              } catch (IOException e) {
+                LOG.error("Unexpected error during pipeline scrubbing", e);
+              }
+            }).build();
 
     pipelineManager.setBackgroundPipelineCreator(backgroundPipelineCreator);
     pipelineManager.setBackgroundPipelineScrubber(backgroundPipelineScrubber);
@@ -656,12 +676,12 @@ public class PipelineManagerImpl implements PipelineManager {
   }
 
   private void setBackgroundPipelineScrubber(
-      BackgroundPipelineScrubber backgroundPipelineScrubber) {
+      BackgroundSCMService backgroundPipelineScrubber) {
     this.backgroundPipelineScrubber = backgroundPipelineScrubber;
   }
 
   @VisibleForTesting
-  public BackgroundPipelineScrubber getBackgroundPipelineScrubber() {
+  public BackgroundSCMService getBackgroundPipelineScrubber() {
     return this.backgroundPipelineScrubber;
   }
 
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestBackgroundPipelineScrubber.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestBackgroundPipelineScrubber.java
deleted file mode 100644
index d60cb2294e..0000000000
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestBackgroundPipelineScrubber.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.hadoop.hdds.scm.pipeline;
-
-import org.apache.hadoop.hdds.HddsConfigKeys;
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.scm.ha.SCMContext;
-import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager.SafeModeStatus;
-import org.apache.ozone.test.TestClock;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-import java.io.IOException;
-import java.time.Instant;
-import java.time.ZoneOffset;
-
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.timeout;
-import static org.mockito.Mockito.verify;
-
-/**
- * Test for {@link BackgroundPipelineScrubber}.
- */
-public class TestBackgroundPipelineScrubber {
-
-  private BackgroundPipelineScrubber scrubber;
-  private SCMContext scmContext;
-  private PipelineManager pipelineManager;
-  private OzoneConfiguration conf;
-  private TestClock testClock;
-
-  @BeforeEach
-  public void setup() throws IOException {
-    testClock = new TestClock(Instant.now(), ZoneOffset.UTC);
-    this.scmContext = SCMContext.emptyContext();
-    this.pipelineManager = mock(PipelineManager.class);
-    doNothing().when(pipelineManager).scrubPipelines();
-
-    this.conf = new OzoneConfiguration();
-    conf.set(HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT, "50s");
-
-    this.scrubber = new BackgroundPipelineScrubber(pipelineManager, conf,
-        scmContext, testClock);
-  }
-
-  @AfterEach
-  public void teardown() throws IOException {
-    scrubber.stop();
-  }
-
-  @Test
-  public void testStop() {
-    assertTrue(scrubber.getRunning());
-    scrubber.stop();
-    assertFalse(scrubber.getRunning());
-  }
-
-  @Test
-  public void testNotifyStatusChanged() {
-    // init at PAUSING
-    assertFalse(scrubber.shouldRun());
-
-    // out of safe mode, PAUSING -> RUNNING
-    scrubber.notifyStatusChanged();
-    // Still cannot run, as the safemode delay has not passed.
-    assertFalse(scrubber.shouldRun());
-
-    testClock.fastForward(60000);
-    assertTrue(scrubber.shouldRun());
-
-    // go into safe mode, RUNNING -> PAUSING
-    scmContext.updateSafeModeStatus(new SafeModeStatus(true, true));
-    scrubber.notifyStatusChanged();
-    assertFalse(scrubber.shouldRun());
-  }
-
-  @Test
-  public void testRun() throws IOException {
-    assertFalse(scrubber.shouldRun());
-    // kick a run
-    synchronized (scrubber) {
-      scrubber.notifyStatusChanged();
-      assertFalse(scrubber.shouldRun());
-      testClock.fastForward(60000);
-      assertTrue(scrubber.shouldRun());
-      scrubber.runImmediately();
-    }
-    verify(pipelineManager, timeout(3000).atLeastOnce()).scrubPipelines();
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org