You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by so...@apache.org on 2022/05/24 20:18:06 UTC
[ozone] branch master updated: HDDS-6771. EC: ReplicationManager - make ContainerReplicaPendingOps into a SCM service (#3445)
This is an automated email from the ASF dual-hosted git repository.
sodonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 94dd1ea7c5 HDDS-6771. EC: ReplicationManager - make ContainerReplicaPendingOps into a SCM service (#3445)
94dd1ea7c5 is described below
commit 94dd1ea7c579fa79c60e71191a1c8dba81a219a0
Author: Jackson Yao <ja...@tencent.com>
AuthorDate: Wed May 25 04:18:00 2022 +0800
HDDS-6771. EC: ReplicationManager - make ContainerReplicaPendingOps into a SCM service (#3445)
---
.../org/apache/hadoop/hdds/scm/ScmConfigKeys.java | 11 ++
.../common/src/main/resources/ozone-default.xml | 18 ++
.../hadoop/hdds/scm/ha/BackgroundSCMService.java | 208 +++++++++++++++++++++
.../hdds/scm/server/StorageContainerManager.java | 31 +++
.../hdds/scm/ha/TestBackgroundSCMService.java | 113 +++++++++++
5 files changed, 381 insertions(+)
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
index a2a0d28bd5..b10f05cd1a 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
@@ -359,6 +359,17 @@ public final class ScmConfigKeys {
public static final boolean OZONE_SCM_DATANODE_DISALLOW_SAME_PEERS_DEFAULT =
false;
+ public static final String
+ OZONE_SCM_EXPIRED_CONTAINER_REPLICA_OP_SCRUB_INTERVAL =
+ "ozone.scm.expired.container.replica.op.scrub.interval";
+ public static final String
+ OZONE_SCM_EXPIRED_CONTAINER_REPLICA_OP_SCRUB_INTERVAL_DEFAULT =
+ "5m";
+ public static final String OZONE_SCM_CONTAINER_REPLICA_OP_TIME_OUT =
+ "ozone.scm.expired.container.replica.op.time.out";
+ public static final String
+ OZONE_SCM_CONTAINER_REPLICA_OP_TIME_OUT_DEFAULT = "30m";
+
// Upper limit for how many pipelines can be created
// across the cluster nodes managed by SCM.
// Only for test purpose now.
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index a10b4315d8..614b8af411 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -1377,6 +1377,24 @@
scrub pipelines.
</description>
</property>
+ <property>
+ <name>ozone.scm.expired.container.replica.op.scrub.interval</name>
+ <value>5m</value>
+ <tag>OZONE, SCM, CONTAINER</tag>
+ <description>
+ SCM schedules a fixed interval job using the configured interval to
+ scrub expired container replica operation.
+ </description>
+ </property>
+ <property>
+ <name>ozone.scm.expired.container.replica.op.time.out</name>
+ <value>30m</value>
+ <tag>OZONE, SCM, CONTAINER</tag>
+ <description>
+ Timeout for the container replica operations(ADD/DELETE).After this
+ timeout the command will be retied
+ </description>
+ </property>
<property>
<name>ozone.scm.pipeline.creation.auto.factor.one</name>
<value>true</value>
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/BackgroundSCMService.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/BackgroundSCMService.java
new file mode 100644
index 0000000000..c4e581318f
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/BackgroundSCMService.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.ha;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.ratis.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Clock;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * A common implementation for background SCMService.
+ * */
+public final class BackgroundSCMService implements SCMService {
+ private final Logger log;
+ private final AtomicBoolean running = new AtomicBoolean(false);
+ private Thread backgroundThread;
+ private final Lock serviceLock = new ReentrantLock();
+ private final SCMContext scmContext;
+ private ServiceStatus serviceStatus = ServiceStatus.PAUSING;
+ private final long intervalInMillis;
+ private final long waitTimeInMillis;
+ private long lastTimeToBeReadyInMillis = 0;
+ private final Clock clock;
+ private final String serviceName;
+ private final Runnable periodicalTask;
+ private volatile boolean runImmediately = false;
+
+ private BackgroundSCMService(
+ final Clock clock, final SCMContext scmContext,
+ final String serviceName, final long intervalInMillis,
+ final long waitTimeInMillis, final Runnable task) {
+ this.scmContext = scmContext;
+ this.clock = clock;
+ this.periodicalTask = task;
+ this.serviceName = serviceName;
+ this.log = LoggerFactory.getLogger(serviceName);
+ this.intervalInMillis = intervalInMillis;
+ this.waitTimeInMillis = waitTimeInMillis;
+ start();
+ }
+
+ @Override
+ public void start() {
+ if (!running.compareAndSet(false, true)) {
+ log.info("{} Service is already running, skip start.", getServiceName());
+ return;
+ }
+ log.info("Starting {} Service.", getServiceName());
+
+ backgroundThread = new Thread(this::run);
+ backgroundThread.setName(serviceName + "Thread");
+ backgroundThread.setDaemon(true);
+ backgroundThread.start();
+ }
+
+ @Override
+ public void notifyStatusChanged() {
+ serviceLock.lock();
+ try {
+ if (scmContext.isLeaderReady() && !scmContext.isInSafeMode()) {
+ if (serviceStatus != ServiceStatus.RUNNING) {
+ log.info("Service {} transitions to RUNNING.", getServiceName());
+ serviceStatus = ServiceStatus.RUNNING;
+ lastTimeToBeReadyInMillis = clock.millis();
+ }
+ } else {
+ if (serviceStatus != ServiceStatus.PAUSING) {
+ log.info("Service {} transitions to PAUSING.", getServiceName());
+ serviceStatus = ServiceStatus.PAUSING;
+ }
+ }
+ } finally {
+ serviceLock.unlock();
+ }
+ }
+
+ private void run() {
+ while (running.get()) {
+ try {
+ if (shouldRun()) {
+ periodicalTask.run();
+ }
+ synchronized (this) {
+ if (!runImmediately) {
+ wait(intervalInMillis);
+ }
+ runImmediately = false;
+ }
+ } catch (InterruptedException e) {
+ log.warn("{} is interrupted, exit", serviceName);
+ Thread.currentThread().interrupt();
+ running.set(false);
+ }
+ }
+ }
+
+ @Override
+ public void stop() {
+ synchronized (this) {
+ if (!running.compareAndSet(true, false)) {
+ log.info("{} Service is not running, skip stop.", getServiceName());
+ return;
+ }
+ backgroundThread.interrupt();
+ }
+ log.info("Stopping {} Service.", getServiceName());
+ }
+
+ @Override
+ public boolean shouldRun() {
+ serviceLock.lock();
+ try {
+ // If safe mode is off, then this SCMService starts to run with a delay.
+ return serviceStatus == ServiceStatus.RUNNING &&
+ clock.millis() - lastTimeToBeReadyInMillis >= waitTimeInMillis;
+ } finally {
+ serviceLock.unlock();
+ }
+ }
+
+ @Override
+ public String getServiceName() {
+ return serviceName;
+ }
+
+ @VisibleForTesting
+ public synchronized void runImmediately() {
+ runImmediately = true;
+ notify();
+ }
+
+ @VisibleForTesting
+ public boolean getRunning() {
+ return running.get();
+ }
+
+ /**
+ * Builder for BackgroundSCMService.
+ * */
+ public static class Builder {
+ private long intervalInMillis;
+ private long waitTimeInMillis;
+ private String serviceName;
+ private Runnable periodicalTask;
+ private SCMContext scmContext;
+ private Clock clock;
+
+ public Builder setIntervalInMillis(final long intervalInMillis) {
+ this.intervalInMillis = intervalInMillis;
+ return this;
+ }
+
+ public Builder setWaitTimeInMillis(final long waitTimeInMillis) {
+ this.waitTimeInMillis = waitTimeInMillis;
+ return this;
+ }
+
+ public Builder setClock(final Clock clock) {
+ this.clock = clock;
+ return this;
+ }
+
+ public Builder setServiceName(final String serviceName) {
+ this.serviceName = serviceName;
+ return this;
+ }
+
+ public Builder setScmContext(final SCMContext scmContext) {
+ this.scmContext = scmContext;
+ return this;
+ }
+
+ public Builder setPeriodicalTask(final Runnable periodicalTask) {
+ this.periodicalTask = periodicalTask;
+ return this;
+ }
+
+ public BackgroundSCMService build() {
+ Preconditions.assertNotNull(scmContext, "scmContext is null");
+ Preconditions.assertNotNull(periodicalTask, "periodicalTask is null");
+ Preconditions.assertNotNull(clock, "clock is null");
+ Preconditions.assertNotNull(serviceName, "serviceName is null");
+
+ return new BackgroundSCMService(clock, scmContext, serviceName,
+ intervalInMillis, waitTimeInMillis, periodicalTask);
+ }
+ }
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 4353d23b45..9e651c8dfd 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerManagerImpl;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaPendingOps;
import org.apache.hadoop.hdds.scm.crl.CRLStatusReportHandler;
+import org.apache.hadoop.hdds.scm.ha.BackgroundSCMService;
import org.apache.hadoop.hdds.scm.ha.HASecurityUtils;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
@@ -625,6 +626,36 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
}
containerReplicaPendingOps = new ContainerReplicaPendingOps(conf, clock);
+
+ long containerReplicaOpScrubberIntervalMs = conf.getTimeDuration(
+ ScmConfigKeys
+ .OZONE_SCM_EXPIRED_CONTAINER_REPLICA_OP_SCRUB_INTERVAL,
+ ScmConfigKeys
+ .OZONE_SCM_EXPIRED_CONTAINER_REPLICA_OP_SCRUB_INTERVAL_DEFAULT,
+ TimeUnit.MILLISECONDS);
+
+ long containerReplicaOpExpiryMs = conf.getTimeDuration(
+ ScmConfigKeys.OZONE_SCM_CONTAINER_REPLICA_OP_TIME_OUT,
+ ScmConfigKeys.OZONE_SCM_CONTAINER_REPLICA_OP_TIME_OUT_DEFAULT,
+ TimeUnit.MILLISECONDS);
+
+ long backgroundServiceSafemodeWaitMs = conf.getTimeDuration(
+ HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT,
+ HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT_DEFAULT,
+ TimeUnit.MILLISECONDS);
+
+ final String backgroundServiceName = "ExpiredContainerReplicaOpScrubber";
+ BackgroundSCMService expiredContainerReplicaOpScrubber =
+ new BackgroundSCMService.Builder().setClock(clock)
+ .setScmContext(scmContext)
+ .setServiceName(backgroundServiceName)
+ .setIntervalInMillis(containerReplicaOpScrubberIntervalMs)
+ .setWaitTimeInMillis(backgroundServiceSafemodeWaitMs)
+ .setPeriodicalTask(() -> containerReplicaPendingOps
+ .removeExpiredEntries(containerReplicaOpExpiryMs)).build();
+
+ serviceManager.register(expiredContainerReplicaOpScrubber);
+
if (configurator.getContainerManager() != null) {
containerManager = configurator.getContainerManager();
} else {
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestBackgroundSCMService.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestBackgroundSCMService.java
new file mode 100644
index 0000000000..e4ef160913
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestBackgroundSCMService.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.ha;
+
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
+import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
+import org.apache.ozone.test.TestClock;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.time.ZoneOffset;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Test for BackgroundSCMService.
+ * */
+public class TestBackgroundSCMService {
+ private BackgroundSCMService backgroundSCMService;
+ private TestClock testClock;
+ private SCMContext scmContext;
+ private PipelineManager pipelineManager;
+
+ @BeforeEach
+ public void setup() throws IOException {
+ testClock = new TestClock(Instant.now(), ZoneOffset.UTC);
+ scmContext = SCMContext.emptyContext();
+ this.pipelineManager = mock(PipelineManager.class);
+ doNothing().when(pipelineManager).scrubPipelines();
+ this.backgroundSCMService = new BackgroundSCMService.Builder()
+ .setClock(testClock)
+ .setScmContext(scmContext)
+ .setServiceName("testBackgroundService")
+ .setIntervalInMillis(1L)
+ .setWaitTimeInMillis(1L)
+ .setPeriodicalTask(() -> {
+ try {
+ pipelineManager.scrubPipelines();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }).build();
+ }
+
+ @AfterEach
+ public void teardown() {
+ backgroundSCMService.stop();
+ }
+
+ @Test
+ public void testStop() {
+ assertTrue(backgroundSCMService.getRunning());
+ backgroundSCMService.stop();
+ assertFalse(backgroundSCMService.getRunning());
+ }
+
+ @Test
+ public void testNotifyStatusChanged() {
+ // init at PAUSING
+ assertFalse(backgroundSCMService.shouldRun());
+
+ // out of safe mode, PAUSING -> RUNNING
+ backgroundSCMService.notifyStatusChanged();
+ // Still cannot run, as the safemode delay has not passed.
+ assertFalse(backgroundSCMService.shouldRun());
+
+ testClock.fastForward(60000);
+ assertTrue(backgroundSCMService.shouldRun());
+
+ // go into safe mode, RUNNING -> PAUSING
+ scmContext.updateSafeModeStatus(
+ new SCMSafeModeManager.SafeModeStatus(true, true));
+ backgroundSCMService.notifyStatusChanged();
+ assertFalse(backgroundSCMService.shouldRun());
+ }
+
+ @Test
+ public void testRun() throws IOException {
+ assertFalse(backgroundSCMService.shouldRun());
+ // kick a run
+ synchronized (backgroundSCMService) {
+ backgroundSCMService.notifyStatusChanged();
+ assertFalse(backgroundSCMService.shouldRun());
+ testClock.fastForward(60000);
+ assertTrue(backgroundSCMService.shouldRun());
+ backgroundSCMService.runImmediately();
+ }
+ verify(pipelineManager, timeout(3000).atLeastOnce()).scrubPipelines();
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org