You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by so...@apache.org on 2022/05/24 20:18:06 UTC

[ozone] branch master updated: HDDS-6771. EC: ReplicationManager - make ContainerReplicaPendingOps into a SCM service (#3445)

This is an automated email from the ASF dual-hosted git repository.

sodonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 94dd1ea7c5 HDDS-6771. EC: ReplicationManager - make ContainerReplicaPendingOps into a SCM service (#3445)
94dd1ea7c5 is described below

commit 94dd1ea7c579fa79c60e71191a1c8dba81a219a0
Author: Jackson Yao <ja...@tencent.com>
AuthorDate: Wed May 25 04:18:00 2022 +0800

    HDDS-6771. EC: ReplicationManager - make ContainerReplicaPendingOps into a SCM service (#3445)
---
 .../org/apache/hadoop/hdds/scm/ScmConfigKeys.java  |  11 ++
 .../common/src/main/resources/ozone-default.xml    |  18 ++
 .../hadoop/hdds/scm/ha/BackgroundSCMService.java   | 208 +++++++++++++++++++++
 .../hdds/scm/server/StorageContainerManager.java   |  31 +++
 .../hdds/scm/ha/TestBackgroundSCMService.java      | 113 +++++++++++
 5 files changed, 381 insertions(+)

diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
index a2a0d28bd5..b10f05cd1a 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
@@ -359,6 +359,17 @@ public final class ScmConfigKeys {
   public static final boolean OZONE_SCM_DATANODE_DISALLOW_SAME_PEERS_DEFAULT =
       false;
 
+  public static final String
+      OZONE_SCM_EXPIRED_CONTAINER_REPLICA_OP_SCRUB_INTERVAL =
+      "ozone.scm.expired.container.replica.op.scrub.interval";
+  public static final String
+      OZONE_SCM_EXPIRED_CONTAINER_REPLICA_OP_SCRUB_INTERVAL_DEFAULT =
+      "5m";
+  public static final String OZONE_SCM_CONTAINER_REPLICA_OP_TIME_OUT =
+      "ozone.scm.expired.container.replica.op.time.out";
+  public static final String
+      OZONE_SCM_CONTAINER_REPLICA_OP_TIME_OUT_DEFAULT = "30m";
+
   // Upper limit for how many pipelines can be created
   // across the cluster nodes managed by SCM.
   // Only for test purpose now.
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index a10b4315d8..614b8af411 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -1377,6 +1377,24 @@
       scrub pipelines.
     </description>
   </property>
+  <property>
+    <name>ozone.scm.expired.container.replica.op.scrub.interval</name>
+    <value>5m</value>
+    <tag>OZONE, SCM, CONTAINER</tag>
+    <description>
+      SCM schedules a fixed interval job using the configured interval to
+      scrub expired container replica operation.
+    </description>
+  </property>
+  <property>
+    <name>ozone.scm.expired.container.replica.op.time.out</name>
+    <value>30m</value>
+    <tag>OZONE, SCM, CONTAINER</tag>
+    <description>
+      Timeout for the container replica operations(ADD/DELETE).After this
+      timeout the command will be retied
+    </description>
+  </property>
   <property>
     <name>ozone.scm.pipeline.creation.auto.factor.one</name>
     <value>true</value>
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/BackgroundSCMService.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/BackgroundSCMService.java
new file mode 100644
index 0000000000..c4e581318f
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/BackgroundSCMService.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.ha;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.ratis.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Clock;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * A common implementation for background SCMService.
+ * */
+public final class BackgroundSCMService implements SCMService {
+  private final Logger log;
+  private final AtomicBoolean running = new AtomicBoolean(false);
+  private Thread backgroundThread;
+  private final Lock serviceLock = new ReentrantLock();
+  private final SCMContext scmContext;
+  private ServiceStatus serviceStatus = ServiceStatus.PAUSING;
+  private final long intervalInMillis;
+  private final long waitTimeInMillis;
+  private long lastTimeToBeReadyInMillis = 0;
+  private final Clock clock;
+  private final String serviceName;
+  private final Runnable periodicalTask;
+  private volatile boolean runImmediately = false;
+
+  private BackgroundSCMService(
+      final Clock clock, final SCMContext scmContext,
+      final String serviceName, final long intervalInMillis,
+      final long waitTimeInMillis, final Runnable task) {
+    this.scmContext = scmContext;
+    this.clock = clock;
+    this.periodicalTask = task;
+    this.serviceName = serviceName;
+    this.log = LoggerFactory.getLogger(serviceName);
+    this.intervalInMillis = intervalInMillis;
+    this.waitTimeInMillis = waitTimeInMillis;
+    start();
+  }
+
+  @Override
+  public void start() {
+    if (!running.compareAndSet(false, true)) {
+      log.info("{} Service is already running, skip start.", getServiceName());
+      return;
+    }
+    log.info("Starting {} Service.", getServiceName());
+
+    backgroundThread = new Thread(this::run);
+    backgroundThread.setName(serviceName + "Thread");
+    backgroundThread.setDaemon(true);
+    backgroundThread.start();
+  }
+
+  @Override
+  public void notifyStatusChanged() {
+    serviceLock.lock();
+    try {
+      if (scmContext.isLeaderReady() && !scmContext.isInSafeMode()) {
+        if (serviceStatus != ServiceStatus.RUNNING) {
+          log.info("Service {} transitions to RUNNING.", getServiceName());
+          serviceStatus = ServiceStatus.RUNNING;
+          lastTimeToBeReadyInMillis = clock.millis();
+        }
+      } else {
+        if (serviceStatus != ServiceStatus.PAUSING) {
+          log.info("Service {} transitions to PAUSING.", getServiceName());
+          serviceStatus = ServiceStatus.PAUSING;
+        }
+      }
+    } finally {
+      serviceLock.unlock();
+    }
+  }
+
+  private void run() {
+    while (running.get()) {
+      try {
+        if (shouldRun()) {
+          periodicalTask.run();
+        }
+        synchronized (this) {
+          if (!runImmediately) {
+            wait(intervalInMillis);
+          }
+          runImmediately = false;
+        }
+      } catch (InterruptedException e) {
+        log.warn("{} is interrupted, exit", serviceName);
+        Thread.currentThread().interrupt();
+        running.set(false);
+      }
+    }
+  }
+
+  @Override
+  public void stop() {
+    synchronized (this) {
+      if (!running.compareAndSet(true, false)) {
+        log.info("{} Service is not running, skip stop.", getServiceName());
+        return;
+      }
+      backgroundThread.interrupt();
+    }
+    log.info("Stopping {} Service.", getServiceName());
+  }
+
+  @Override
+  public boolean shouldRun() {
+    serviceLock.lock();
+    try {
+      // If safe mode is off, then this SCMService starts to run with a delay.
+      return serviceStatus == ServiceStatus.RUNNING &&
+          clock.millis() - lastTimeToBeReadyInMillis >= waitTimeInMillis;
+    } finally {
+      serviceLock.unlock();
+    }
+  }
+
+  @Override
+  public String getServiceName() {
+    return serviceName;
+  }
+
+  @VisibleForTesting
+  public synchronized void runImmediately() {
+    runImmediately = true;
+    notify();
+  }
+
+  @VisibleForTesting
+  public boolean getRunning() {
+    return running.get();
+  }
+
+  /**
+   * Builder for BackgroundSCMService.
+   * */
+  public static class Builder {
+    private long intervalInMillis;
+    private long waitTimeInMillis;
+    private String serviceName;
+    private Runnable periodicalTask;
+    private SCMContext scmContext;
+    private Clock clock;
+
+    public Builder setIntervalInMillis(final long intervalInMillis) {
+      this.intervalInMillis = intervalInMillis;
+      return this;
+    }
+
+    public Builder setWaitTimeInMillis(final long waitTimeInMillis) {
+      this.waitTimeInMillis = waitTimeInMillis;
+      return this;
+    }
+
+    public Builder setClock(final Clock clock) {
+      this.clock = clock;
+      return this;
+    }
+
+    public Builder setServiceName(final String serviceName) {
+      this.serviceName = serviceName;
+      return this;
+    }
+
+    public Builder setScmContext(final SCMContext scmContext) {
+      this.scmContext = scmContext;
+      return this;
+    }
+
+    public Builder setPeriodicalTask(final Runnable periodicalTask) {
+      this.periodicalTask = periodicalTask;
+      return this;
+    }
+
+    public BackgroundSCMService build() {
+      Preconditions.assertNotNull(scmContext, "scmContext is null");
+      Preconditions.assertNotNull(periodicalTask, "periodicalTask is null");
+      Preconditions.assertNotNull(clock, "clock is null");
+      Preconditions.assertNotNull(serviceName, "serviceName is null");
+
+      return new BackgroundSCMService(clock, scmContext, serviceName,
+          intervalInMillis, waitTimeInMillis, periodicalTask);
+    }
+  }
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 4353d23b45..9e651c8dfd 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerManagerImpl;
 import org.apache.hadoop.hdds.scm.container.ContainerManager;
 import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaPendingOps;
 import org.apache.hadoop.hdds.scm.crl.CRLStatusReportHandler;
+import org.apache.hadoop.hdds.scm.ha.BackgroundSCMService;
 import org.apache.hadoop.hdds.scm.ha.HASecurityUtils;
 import org.apache.hadoop.hdds.scm.ha.SCMContext;
 import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
@@ -625,6 +626,36 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
     }
 
     containerReplicaPendingOps = new ContainerReplicaPendingOps(conf, clock);
+
+    long containerReplicaOpScrubberIntervalMs = conf.getTimeDuration(
+        ScmConfigKeys
+            .OZONE_SCM_EXPIRED_CONTAINER_REPLICA_OP_SCRUB_INTERVAL,
+        ScmConfigKeys
+            .OZONE_SCM_EXPIRED_CONTAINER_REPLICA_OP_SCRUB_INTERVAL_DEFAULT,
+        TimeUnit.MILLISECONDS);
+
+    long containerReplicaOpExpiryMs = conf.getTimeDuration(
+        ScmConfigKeys.OZONE_SCM_CONTAINER_REPLICA_OP_TIME_OUT,
+        ScmConfigKeys.OZONE_SCM_CONTAINER_REPLICA_OP_TIME_OUT_DEFAULT,
+        TimeUnit.MILLISECONDS);
+
+    long backgroundServiceSafemodeWaitMs = conf.getTimeDuration(
+        HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT,
+        HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT_DEFAULT,
+        TimeUnit.MILLISECONDS);
+
+    final String backgroundServiceName = "ExpiredContainerReplicaOpScrubber";
+    BackgroundSCMService expiredContainerReplicaOpScrubber =
+        new BackgroundSCMService.Builder().setClock(clock)
+            .setScmContext(scmContext)
+            .setServiceName(backgroundServiceName)
+            .setIntervalInMillis(containerReplicaOpScrubberIntervalMs)
+            .setWaitTimeInMillis(backgroundServiceSafemodeWaitMs)
+            .setPeriodicalTask(() -> containerReplicaPendingOps
+                .removeExpiredEntries(containerReplicaOpExpiryMs)).build();
+
+    serviceManager.register(expiredContainerReplicaOpScrubber);
+
     if (configurator.getContainerManager() != null) {
       containerManager = configurator.getContainerManager();
     } else {
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestBackgroundSCMService.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestBackgroundSCMService.java
new file mode 100644
index 0000000000..e4ef160913
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestBackgroundSCMService.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.ha;
+
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
+import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
+import org.apache.ozone.test.TestClock;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.time.ZoneOffset;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Test for BackgroundSCMService.
+ * */
+public class TestBackgroundSCMService {
+  private BackgroundSCMService backgroundSCMService;
+  private TestClock testClock;
+  private SCMContext scmContext;
+  private PipelineManager pipelineManager;
+
+  @BeforeEach
+  public void setup() throws IOException {
+    testClock = new TestClock(Instant.now(), ZoneOffset.UTC);
+    scmContext = SCMContext.emptyContext();
+    this.pipelineManager = mock(PipelineManager.class);
+    doNothing().when(pipelineManager).scrubPipelines();
+    this.backgroundSCMService = new BackgroundSCMService.Builder()
+        .setClock(testClock)
+        .setScmContext(scmContext)
+        .setServiceName("testBackgroundService")
+        .setIntervalInMillis(1L)
+        .setWaitTimeInMillis(1L)
+        .setPeriodicalTask(() -> {
+          try {
+            pipelineManager.scrubPipelines();
+          } catch (IOException e) {
+            throw new RuntimeException(e);
+          }
+        }).build();
+  }
+
+  @AfterEach
+  public void teardown() {
+    backgroundSCMService.stop();
+  }
+
+  @Test
+  public void testStop() {
+    assertTrue(backgroundSCMService.getRunning());
+    backgroundSCMService.stop();
+    assertFalse(backgroundSCMService.getRunning());
+  }
+
+  @Test
+  public void testNotifyStatusChanged() {
+    // init at PAUSING
+    assertFalse(backgroundSCMService.shouldRun());
+
+    // out of safe mode, PAUSING -> RUNNING
+    backgroundSCMService.notifyStatusChanged();
+    // Still cannot run, as the safemode delay has not passed.
+    assertFalse(backgroundSCMService.shouldRun());
+
+    testClock.fastForward(60000);
+    assertTrue(backgroundSCMService.shouldRun());
+
+    // go into safe mode, RUNNING -> PAUSING
+    scmContext.updateSafeModeStatus(
+        new SCMSafeModeManager.SafeModeStatus(true, true));
+    backgroundSCMService.notifyStatusChanged();
+    assertFalse(backgroundSCMService.shouldRun());
+  }
+
+  @Test
+  public void testRun() throws IOException {
+    assertFalse(backgroundSCMService.shouldRun());
+    // kick a run
+    synchronized (backgroundSCMService) {
+      backgroundSCMService.notifyStatusChanged();
+      assertFalse(backgroundSCMService.shouldRun());
+      testClock.fastForward(60000);
+      assertTrue(backgroundSCMService.shouldRun());
+      backgroundSCMService.runImmediately();
+    }
+    verify(pipelineManager, timeout(3000).atLeastOnce()).scrubPipelines();
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org