You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by dr...@apache.org on 2022/08/16 13:18:34 UTC
[ratis] 05/12: RATIS-1603. TimeoutScheduler can have a huge amount of threads and cause OOM. (#666)
This is an automated email from the ASF dual-hosted git repository.
dragonyliu pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/ratis.git
commit 0c2ecd615773cbb0bb6f3a475c93a79ea5982d7d
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Fri Aug 5 18:51:29 2022 -0700
RATIS-1603. TimeoutScheduler can have a huge amount of threads and cause OOM. (#666)
(cherry picked from commit 34867f25afc1329d722253e94e008f7616b9eb39)
---
.../ratis/client/impl/OrderedStreamAsync.java | 4 +-
.../apache/ratis/client/impl/RaftClientImpl.java | 7 +-
.../org/apache/ratis/util/TimeoutExecutor.java | 50 ++++++++++
.../org/apache/ratis/util/TimeoutScheduler.java | 22 +----
.../java/org/apache/ratis/util/TimeoutTimer.java | 109 +++++++++++++++++++++
.../grpc/client/GrpcClientProtocolClient.java | 5 +-
.../apache/ratis/grpc/server/GrpcLogAppender.java | 2 +-
.../ratis/netty/client/NettyClientStreamRpc.java | 4 +-
.../apache/ratis/server/impl/PendingStepDown.java | 4 +-
.../impl/SnapshotManagementRequestHandler.java | 4 +-
.../ratis/server/impl/TransferLeadership.java | 4 +-
.../apache/ratis/server/impl/WatchRequests.java | 2 +-
.../apache/ratis/util/TestTimeoutScheduler.java | 4 +-
13 files changed, 183 insertions(+), 38 deletions(-)
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
index ed4a20c03..8683a7f20 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
@@ -33,7 +33,7 @@ import org.apache.ratis.util.IOUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.SlidingWindow;
import org.apache.ratis.util.TimeDuration;
-import org.apache.ratis.util.TimeoutScheduler;
+import org.apache.ratis.util.TimeoutExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -107,7 +107,7 @@ public class OrderedStreamAsync {
private final Semaphore requestSemaphore;
private final TimeDuration requestTimeout;
private final TimeDuration closeTimeout;
- private final TimeoutScheduler scheduler = TimeoutScheduler.getInstance();
+ private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();
OrderedStreamAsync(DataStreamClientRpc dataStreamClientRpc, RaftProperties properties){
this.dataStreamClientRpc = dataStreamClientRpc;
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index 7313c9149..a333fe9a7 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -48,7 +48,7 @@ import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.MemoizedSupplier;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.TimeDuration;
-import org.apache.ratis.util.TimeoutScheduler;
+import org.apache.ratis.util.TimeoutExecutor;
import java.io.IOException;
import java.util.ArrayList;
@@ -135,7 +135,7 @@ public final class RaftClientImpl implements RaftClient {
private volatile RaftPeerId leaderId;
- private final TimeoutScheduler scheduler = TimeoutScheduler.getInstance();
+ private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();
private final Supplier<OrderedAsync> orderedAsync;
private final Supplier<AsyncImpl> asyncApi;
@@ -227,7 +227,7 @@ public final class RaftClientImpl implements RaftClient {
TimeDuration.ZERO : sleepDefault;
}
- TimeoutScheduler getScheduler() {
+ TimeoutExecutor getScheduler() {
return scheduler;
}
@@ -404,7 +404,6 @@ public final class RaftClientImpl implements RaftClient {
@Override
public void close() throws IOException {
- scheduler.close();
clientRpc.close();
if (dataStreamApi.isInitialized()) {
dataStreamApi.get().close();
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/TimeoutExecutor.java b/ratis-common/src/main/java/org/apache/ratis/util/TimeoutExecutor.java
new file mode 100644
index 000000000..aace13b88
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/util/TimeoutExecutor.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util;
+
+import org.apache.ratis.util.function.CheckedRunnable;
+import org.slf4j.Logger;
+
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+/** Execute timeout tasks. */
+public interface TimeoutExecutor {
+ int MAXIMUM_POOL_SIZE = 8;
+ static TimeoutExecutor getInstance() {
+ return TimeoutTimer.getInstance();
+ }
+
+ /** @return the number of scheduled but not completed timeout tasks. */
+ int getTaskCount();
+
+ /**
+ * Schedule a timeout task.
+ *
+ * @param timeout the timeout value.
+ * @param task the task to run when timeout.
+ * @param errorHandler to handle the error, if there is any.
+ */
+ <THROWABLE extends Throwable> void onTimeout(
+ TimeDuration timeout, CheckedRunnable<THROWABLE> task, Consumer<THROWABLE> errorHandler);
+
+ /** When timeout, run the task. Log the error, if there is any. */
+ default void onTimeout(TimeDuration timeout, CheckedRunnable<?> task, Logger log, Supplier<String> errorMessage) {
+ onTimeout(timeout, task, t -> log.error(errorMessage.get(), t));
+ }
+}
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/TimeoutScheduler.java b/ratis-common/src/main/java/org/apache/ratis/util/TimeoutScheduler.java
index 9c428f2f4..cba2851f4 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/TimeoutScheduler.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/TimeoutScheduler.java
@@ -21,7 +21,6 @@ import org.apache.ratis.util.function.CheckedRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.Closeable;
import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.ScheduledFuture;
@@ -32,7 +31,7 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;
-public final class TimeoutScheduler implements Closeable {
+public final class TimeoutScheduler implements TimeoutExecutor {
public static final Logger LOG = LoggerFactory.getLogger(TimeoutScheduler.class);
static final TimeDuration DEFAULT_GRACE_PERIOD = TimeDuration.valueOf(1, TimeUnit.MINUTES);
@@ -110,7 +109,8 @@ public final class TimeoutScheduler implements Closeable {
private TimeoutScheduler() {
}
- int getQueueSize() {
+ @Override
+ public int getTaskCount() {
return scheduler.getQueueSize();
}
@@ -126,13 +126,7 @@ public final class TimeoutScheduler implements Closeable {
return scheduler.hasExecutor();
}
- /**
- * Schedule a timeout task.
- *
- * @param timeout the timeout value.
- * @param task the task to run when timeout.
- * @param errorHandler to handle the error, if there is any.
- */
+ @Override
public <THROWABLE extends Throwable> void onTimeout(
TimeDuration timeout, CheckedRunnable<THROWABLE> task, Consumer<THROWABLE> errorHandler) {
onTimeout(timeout, sid -> {
@@ -186,13 +180,7 @@ public final class TimeoutScheduler implements Closeable {
}
}
- /** When timeout, run the task. Log the error, if there is any. */
- public void onTimeout(TimeDuration timeout, CheckedRunnable<?> task, Logger log, Supplier<String> errorMessage) {
- onTimeout(timeout, task, t -> log.error(errorMessage.get(), t));
- }
-
- @Override
- public synchronized void close() {
+ public synchronized void tryShutdownScheduler() {
tryShutdownScheduler(scheduleID);
}
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/TimeoutTimer.java b/ratis-common/src/main/java/org/apache/ratis/util/TimeoutTimer.java
new file mode 100644
index 000000000..36cbb2b29
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/util/TimeoutTimer.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util;
+
+import org.apache.ratis.util.function.CheckedRunnable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+public final class TimeoutTimer implements TimeoutExecutor {
+ public static final Logger LOG = LoggerFactory.getLogger(TimeoutTimer.class);
+
+ private static final Supplier<TimeoutTimer> INSTANCE = JavaUtils.memoize(() -> new TimeoutTimer(MAXIMUM_POOL_SIZE));
+
+ public static TimeoutTimer getInstance() {
+ return INSTANCE.get();
+ }
+
+ static class Task extends TimerTask {
+ private final int id;
+ private final Runnable runnable;
+
+ Task(int id, Runnable runnable) {
+ this.id = id;
+ this.runnable = LogUtils.newRunnable(LOG, runnable, this::toString);
+ }
+
+ @Override
+ public void run() {
+ LOG.debug("run {}", this);
+ runnable.run();
+ }
+
+ @Override
+ public String toString() {
+ return "task #" + id;
+ }
+ }
+
+ /** The number of scheduled tasks. */
+ private final AtomicInteger numTasks = new AtomicInteger();
+ /** A unique ID for each task. */
+ private final AtomicInteger taskId = new AtomicInteger();
+
+ private final List<MemoizedSupplier<Timer>> timers;
+
+ private TimeoutTimer(int numTimers) {
+ final List<MemoizedSupplier<Timer>> list = new ArrayList<>(numTimers);
+ for(int i = 0; i < numTimers; i++) {
+ final String name = "timer" + i;
+ list.add(JavaUtils.memoize(() -> new Timer(name, true)));
+ }
+ this.timers = Collections.unmodifiableList(list);
+ }
+
+ @Override
+ public int getTaskCount() {
+ return numTasks.get();
+ }
+
+ private Timer getTimer(int tid) {
+ return timers.get(Math.toIntExact(Integer.toUnsignedLong(tid) % timers.size())).get();
+ }
+
+ private void schedule(TimeDuration timeout, Runnable toSchedule) {
+ final int tid = taskId.incrementAndGet();
+ final int n = numTasks.incrementAndGet();
+ LOG.debug("schedule a task #{} with timeout {}, numTasks={}", tid, timeout, n);
+ getTimer(n).schedule(new Task(tid, toSchedule), timeout.toLong(TimeUnit.MILLISECONDS));
+ }
+
+ @Override
+ public <THROWABLE extends Throwable> void onTimeout(
+ TimeDuration timeout, CheckedRunnable<THROWABLE> task, Consumer<THROWABLE> errorHandler) {
+ schedule(timeout, () -> {
+ try {
+ task.run();
+ } catch(Throwable t) {
+ errorHandler.accept(JavaUtils.cast(t));
+ } finally {
+ numTasks.decrementAndGet();
+ }
+ });
+ }
+}
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
index c2ee1f247..d8b128a43 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
@@ -58,7 +58,7 @@ import org.apache.ratis.util.CollectionUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.SizeInBytes;
import org.apache.ratis.util.TimeDuration;
-import org.apache.ratis.util.TimeoutScheduler;
+import org.apache.ratis.util.TimeoutExecutor;
import org.apache.ratis.util.function.CheckedSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -86,7 +86,7 @@ public class GrpcClientProtocolClient implements Closeable {
private final TimeDuration requestTimeoutDuration;
private final TimeDuration watchRequestTimeoutDuration;
- private final TimeoutScheduler scheduler = TimeoutScheduler.getInstance();
+ private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();
private final RaftClientProtocolServiceStub asyncStub;
private final AdminProtocolServiceBlockingStub adminBlockingStub;
@@ -173,7 +173,6 @@ public class GrpcClientProtocolClient implements Closeable {
if (clientChannel != adminChannel) {
GrpcUtil.shutdownManagedChannel(adminChannel);
}
- scheduler.close();
metricClientInterceptor.close();
}
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index 69a3795ca..3f01ea299 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -63,7 +63,7 @@ public class GrpcLogAppender extends LogAppenderBase {
private final boolean installSnapshotEnabled;
private final TimeDuration requestTimeoutDuration;
- private final TimeoutScheduler scheduler = TimeoutScheduler.getInstance();
+ private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();
private volatile StreamObservers appendLogRequestObserver;
private final boolean useSeparateHBChannel;
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
index 37f09b8da..a03013748 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
@@ -52,7 +52,7 @@ import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.MemoizedSupplier;
import org.apache.ratis.util.NetUtils;
import org.apache.ratis.util.TimeDuration;
-import org.apache.ratis.util.TimeoutScheduler;
+import org.apache.ratis.util.TimeoutExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -243,7 +243,7 @@ public class NettyClientStreamRpc implements DataStreamClientRpc {
private final ConcurrentMap<ClientInvocationId, ReplyQueue> replies = new ConcurrentHashMap<>();
private final TimeDuration replyQueueGracePeriod;
- private final TimeoutScheduler timeoutScheduler = TimeoutScheduler.getInstance();
+ private final TimeoutExecutor timeoutScheduler = TimeoutExecutor.getInstance();
public NettyClientStreamRpc(RaftPeer server, TlsConf tlsConf, RaftProperties properties) {
this.name = JavaUtils.getClassSimpleName(getClass()) + "->" + server;
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingStepDown.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingStepDown.java
index 5b928a69e..b7bfde3f6 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingStepDown.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingStepDown.java
@@ -25,7 +25,7 @@ import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.MemoizedSupplier;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.TimeDuration;
-import org.apache.ratis.util.TimeoutScheduler;
+import org.apache.ratis.util.TimeoutExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -81,7 +81,7 @@ public class PendingStepDown {
}
private final LeaderStateImpl leader;
- private final TimeoutScheduler scheduler = TimeoutScheduler.getInstance();
+ private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();
private final PendingRequestReference pending = new PendingRequestReference();
PendingStepDown(LeaderStateImpl leaderState) {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotManagementRequestHandler.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotManagementRequestHandler.java
index 416e501ad..b899752e0 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotManagementRequestHandler.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotManagementRequestHandler.java
@@ -23,7 +23,7 @@ import org.apache.ratis.protocol.exceptions.TimeoutIOException;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.MemoizedSupplier;
import org.apache.ratis.util.TimeDuration;
-import org.apache.ratis.util.TimeoutScheduler;
+import org.apache.ratis.util.TimeoutExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -89,7 +89,7 @@ class SnapshotManagementRequestHandler {
}
private final RaftServerImpl server;
- private final TimeoutScheduler scheduler = TimeoutScheduler.getInstance();
+ private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();
private final PendingRequestReference pending = new PendingRequestReference();
SnapshotManagementRequestHandler(RaftServerImpl server) {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/TransferLeadership.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/TransferLeadership.java
index 03b1dfc80..3aed1a10d 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/TransferLeadership.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/TransferLeadership.java
@@ -24,7 +24,7 @@ import org.apache.ratis.protocol.exceptions.TransferLeadershipException;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.MemoizedSupplier;
import org.apache.ratis.util.TimeDuration;
-import org.apache.ratis.util.TimeoutScheduler;
+import org.apache.ratis.util.TimeoutExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -74,7 +74,7 @@ public class TransferLeadership {
}
private final RaftServerImpl server;
- private final TimeoutScheduler scheduler = TimeoutScheduler.getInstance();
+ private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();
private final AtomicReference<PendingRequest> pending = new AtomicReference<>();
TransferLeadership(RaftServerImpl server) {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/WatchRequests.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/WatchRequests.java
index 3b95d4bf1..f4c6200b9 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/WatchRequests.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/WatchRequests.java
@@ -169,7 +169,7 @@ class WatchRequests {
private final TimeDuration watchTimeoutNanos;
private final TimeDuration watchTimeoutDenominationNanos;
- private final TimeoutScheduler scheduler = TimeoutScheduler.getInstance();
+ private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();
WatchRequests(Object name, RaftProperties properties) {
this.name = name + "-" + JavaUtils.getClassSimpleName(getClass());
diff --git a/ratis-test/src/test/java/org/apache/ratis/util/TestTimeoutScheduler.java b/ratis-test/src/test/java/org/apache/ratis/util/TestTimeoutScheduler.java
index dbe0e943f..cca1cfdea 100644
--- a/ratis-test/src/test/java/org/apache/ratis/util/TestTimeoutScheduler.java
+++ b/ratis-test/src/test/java/org/apache/ratis/util/TestTimeoutScheduler.java
@@ -223,7 +223,7 @@ public class TestTimeoutScheduler extends BaseTest {
}
HUNDRED_MILLIS.sleep();
HUNDRED_MILLIS.sleep();
- JavaUtils.attempt(() -> Assert.assertEquals(1, scheduler.getQueueSize()),
+ JavaUtils.attempt(() -> Assert.assertEquals(1, scheduler.getTaskCount()),
10, HUNDRED_MILLIS, "only 1 shutdown task is scheduled", LOG);
final TimeDuration oneMillis = TimeDuration.valueOf(1, TimeUnit.MILLISECONDS);
@@ -234,7 +234,7 @@ public class TestTimeoutScheduler extends BaseTest {
oneMillis.sleep();
}
HUNDRED_MILLIS.sleep();
- JavaUtils.attempt(() -> Assert.assertEquals(1, scheduler.getQueueSize()),
+ JavaUtils.attempt(() -> Assert.assertEquals(1, scheduler.getTaskCount()),
10, HUNDRED_MILLIS, "only 1 shutdown task is scheduled", LOG);
errorHandler.assertNoError();