You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by dr...@apache.org on 2022/08/16 13:18:34 UTC

[ratis] 05/12: RATIS-1603. TimeoutScheduler can have a huge amount of threads and cause OOM. (#666)

This is an automated email from the ASF dual-hosted git repository.

dragonyliu pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/ratis.git

commit 0c2ecd615773cbb0bb6f3a475c93a79ea5982d7d
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Fri Aug 5 18:51:29 2022 -0700

    RATIS-1603. TimeoutScheduler can have a huge amount of threads and cause OOM. (#666)
    
    
    (cherry picked from commit 34867f25afc1329d722253e94e008f7616b9eb39)
---
 .../ratis/client/impl/OrderedStreamAsync.java      |   4 +-
 .../apache/ratis/client/impl/RaftClientImpl.java   |   7 +-
 .../org/apache/ratis/util/TimeoutExecutor.java     |  50 ++++++++++
 .../org/apache/ratis/util/TimeoutScheduler.java    |  22 +----
 .../java/org/apache/ratis/util/TimeoutTimer.java   | 109 +++++++++++++++++++++
 .../grpc/client/GrpcClientProtocolClient.java      |   5 +-
 .../apache/ratis/grpc/server/GrpcLogAppender.java  |   2 +-
 .../ratis/netty/client/NettyClientStreamRpc.java   |   4 +-
 .../apache/ratis/server/impl/PendingStepDown.java  |   4 +-
 .../impl/SnapshotManagementRequestHandler.java     |   4 +-
 .../ratis/server/impl/TransferLeadership.java      |   4 +-
 .../apache/ratis/server/impl/WatchRequests.java    |   2 +-
 .../apache/ratis/util/TestTimeoutScheduler.java    |   4 +-
 13 files changed, 183 insertions(+), 38 deletions(-)

diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
index ed4a20c03..8683a7f20 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
@@ -33,7 +33,7 @@ import org.apache.ratis.util.IOUtils;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.SlidingWindow;
 import org.apache.ratis.util.TimeDuration;
-import org.apache.ratis.util.TimeoutScheduler;
+import org.apache.ratis.util.TimeoutExecutor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -107,7 +107,7 @@ public class OrderedStreamAsync {
   private final Semaphore requestSemaphore;
   private final TimeDuration requestTimeout;
   private final TimeDuration closeTimeout;
-  private final TimeoutScheduler scheduler = TimeoutScheduler.getInstance();
+  private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();
 
   OrderedStreamAsync(DataStreamClientRpc dataStreamClientRpc, RaftProperties properties){
     this.dataStreamClientRpc = dataStreamClientRpc;
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index 7313c9149..a333fe9a7 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -48,7 +48,7 @@ import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.MemoizedSupplier;
 import org.apache.ratis.util.Preconditions;
 import org.apache.ratis.util.TimeDuration;
-import org.apache.ratis.util.TimeoutScheduler;
+import org.apache.ratis.util.TimeoutExecutor;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -135,7 +135,7 @@ public final class RaftClientImpl implements RaftClient {
 
   private volatile RaftPeerId leaderId;
 
-  private final TimeoutScheduler scheduler = TimeoutScheduler.getInstance();
+  private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();
 
   private final Supplier<OrderedAsync> orderedAsync;
   private final Supplier<AsyncImpl> asyncApi;
@@ -227,7 +227,7 @@ public final class RaftClientImpl implements RaftClient {
         TimeDuration.ZERO : sleepDefault;
   }
 
-  TimeoutScheduler getScheduler() {
+  TimeoutExecutor getScheduler() {
     return scheduler;
   }
 
@@ -404,7 +404,6 @@ public final class RaftClientImpl implements RaftClient {
 
   @Override
   public void close() throws IOException {
-    scheduler.close();
     clientRpc.close();
     if (dataStreamApi.isInitialized()) {
       dataStreamApi.get().close();
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/TimeoutExecutor.java b/ratis-common/src/main/java/org/apache/ratis/util/TimeoutExecutor.java
new file mode 100644
index 000000000..aace13b88
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/util/TimeoutExecutor.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util;
+
+import org.apache.ratis.util.function.CheckedRunnable;
+import org.slf4j.Logger;
+
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+/** Execute timeout tasks. */
+public interface TimeoutExecutor {
+  int MAXIMUM_POOL_SIZE =  8;
+  static TimeoutExecutor getInstance() {
+    return TimeoutTimer.getInstance();
+  }
+
+  /** @return the number of scheduled but not completed timeout tasks. */
+  int getTaskCount();
+
+  /**
+   * Schedule a timeout task.
+   *
+   * @param timeout the timeout value.
+   * @param task the task to run when timeout.
+   * @param errorHandler to handle the error, if there is any.
+   */
+  <THROWABLE extends Throwable> void onTimeout(
+      TimeDuration timeout, CheckedRunnable<THROWABLE> task, Consumer<THROWABLE> errorHandler);
+
+  /** When timeout, run the task.  Log the error, if there is any. */
+  default void onTimeout(TimeDuration timeout, CheckedRunnable<?> task, Logger log, Supplier<String> errorMessage) {
+    onTimeout(timeout, task, t -> log.error(errorMessage.get(), t));
+  }
+}
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/TimeoutScheduler.java b/ratis-common/src/main/java/org/apache/ratis/util/TimeoutScheduler.java
index 9c428f2f4..cba2851f4 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/TimeoutScheduler.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/TimeoutScheduler.java
@@ -21,7 +21,6 @@ import org.apache.ratis.util.function.CheckedRunnable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.Closeable;
 import java.util.Collection;
 import java.util.Optional;
 import java.util.concurrent.ScheduledFuture;
@@ -32,7 +31,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
 
-public final class TimeoutScheduler implements Closeable {
+public final class TimeoutScheduler implements TimeoutExecutor {
   public static final Logger LOG = LoggerFactory.getLogger(TimeoutScheduler.class);
 
   static final TimeDuration DEFAULT_GRACE_PERIOD = TimeDuration.valueOf(1, TimeUnit.MINUTES);
@@ -110,7 +109,8 @@ public final class TimeoutScheduler implements Closeable {
   private TimeoutScheduler() {
   }
 
-  int getQueueSize() {
+  @Override
+  public int getTaskCount() {
     return scheduler.getQueueSize();
   }
 
@@ -126,13 +126,7 @@ public final class TimeoutScheduler implements Closeable {
     return scheduler.hasExecutor();
   }
 
-  /**
-   * Schedule a timeout task.
-   *
-   * @param timeout the timeout value.
-   * @param task the task to run when timeout.
-   * @param errorHandler to handle the error, if there is any.
-   */
+  @Override
   public <THROWABLE extends Throwable> void onTimeout(
       TimeDuration timeout, CheckedRunnable<THROWABLE> task, Consumer<THROWABLE> errorHandler) {
     onTimeout(timeout, sid -> {
@@ -186,13 +180,7 @@ public final class TimeoutScheduler implements Closeable {
     }
   }
 
-  /** When timeout, run the task.  Log the error, if there is any. */
-  public void onTimeout(TimeDuration timeout, CheckedRunnable<?> task, Logger log, Supplier<String> errorMessage) {
-    onTimeout(timeout, task, t -> log.error(errorMessage.get(), t));
-  }
-
-  @Override
-  public synchronized void close() {
+  public synchronized void tryShutdownScheduler() {
     tryShutdownScheduler(scheduleID);
   }
 }
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/TimeoutTimer.java b/ratis-common/src/main/java/org/apache/ratis/util/TimeoutTimer.java
new file mode 100644
index 000000000..36cbb2b29
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/util/TimeoutTimer.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util;
+
+import org.apache.ratis.util.function.CheckedRunnable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+public final class TimeoutTimer implements TimeoutExecutor {
+  public static final Logger LOG = LoggerFactory.getLogger(TimeoutTimer.class);
+
+  private static final Supplier<TimeoutTimer> INSTANCE = JavaUtils.memoize(() -> new TimeoutTimer(MAXIMUM_POOL_SIZE));
+
+  public static TimeoutTimer getInstance() {
+    return INSTANCE.get();
+  }
+
+  static class Task extends TimerTask {
+    private final int id;
+    private final Runnable runnable;
+
+    Task(int id, Runnable runnable) {
+      this.id = id;
+      this.runnable = LogUtils.newRunnable(LOG, runnable, this::toString);
+    }
+
+    @Override
+    public void run() {
+      LOG.debug("run {}", this);
+      runnable.run();
+    }
+
+    @Override
+    public String toString() {
+      return "task #" + id;
+    }
+  }
+
+  /** The number of scheduled tasks. */
+  private final AtomicInteger numTasks = new AtomicInteger();
+  /** A unique ID for each task. */
+  private final AtomicInteger taskId = new AtomicInteger();
+
+  private final List<MemoizedSupplier<Timer>> timers;
+
+  private TimeoutTimer(int numTimers) {
+    final List<MemoizedSupplier<Timer>> list = new ArrayList<>(numTimers);
+    for(int i = 0; i < numTimers; i++) {
+      final String name = "timer" + i;
+      list.add(JavaUtils.memoize(() -> new Timer(name, true)));
+    }
+    this.timers = Collections.unmodifiableList(list);
+  }
+
+  @Override
+  public int getTaskCount() {
+    return numTasks.get();
+  }
+
+  private Timer getTimer(int tid) {
+    return timers.get(Math.toIntExact(Integer.toUnsignedLong(tid) % timers.size())).get();
+  }
+
+  private void schedule(TimeDuration timeout, Runnable toSchedule) {
+    final int tid = taskId.incrementAndGet();
+    final int n = numTasks.incrementAndGet();
+    LOG.debug("schedule a task #{} with timeout {}, numTasks={}", tid, timeout, n);
+    getTimer(n).schedule(new Task(tid, toSchedule), timeout.toLong(TimeUnit.MILLISECONDS));
+  }
+
+  @Override
+  public <THROWABLE extends Throwable> void onTimeout(
+      TimeDuration timeout, CheckedRunnable<THROWABLE> task, Consumer<THROWABLE> errorHandler) {
+    schedule(timeout, () -> {
+      try {
+        task.run();
+      } catch(Throwable t) {
+        errorHandler.accept(JavaUtils.cast(t));
+      } finally {
+        numTasks.decrementAndGet();
+      }
+    });
+  }
+}
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
index c2ee1f247..d8b128a43 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
@@ -58,7 +58,7 @@ import org.apache.ratis.util.CollectionUtils;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.SizeInBytes;
 import org.apache.ratis.util.TimeDuration;
-import org.apache.ratis.util.TimeoutScheduler;
+import org.apache.ratis.util.TimeoutExecutor;
 import org.apache.ratis.util.function.CheckedSupplier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -86,7 +86,7 @@ public class GrpcClientProtocolClient implements Closeable {
 
   private final TimeDuration requestTimeoutDuration;
   private final TimeDuration watchRequestTimeoutDuration;
-  private final TimeoutScheduler scheduler = TimeoutScheduler.getInstance();
+  private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();
 
   private final RaftClientProtocolServiceStub asyncStub;
   private final AdminProtocolServiceBlockingStub adminBlockingStub;
@@ -173,7 +173,6 @@ public class GrpcClientProtocolClient implements Closeable {
     if (clientChannel != adminChannel) {
       GrpcUtil.shutdownManagedChannel(adminChannel);
     }
-    scheduler.close();
     metricClientInterceptor.close();
   }
 
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index 69a3795ca..3f01ea299 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -63,7 +63,7 @@ public class GrpcLogAppender extends LogAppenderBase {
   private final boolean installSnapshotEnabled;
 
   private final TimeDuration requestTimeoutDuration;
-  private final TimeoutScheduler scheduler = TimeoutScheduler.getInstance();
+  private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();
 
   private volatile StreamObservers appendLogRequestObserver;
   private final boolean useSeparateHBChannel;
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
index 37f09b8da..a03013748 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
@@ -52,7 +52,7 @@ import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.MemoizedSupplier;
 import org.apache.ratis.util.NetUtils;
 import org.apache.ratis.util.TimeDuration;
-import org.apache.ratis.util.TimeoutScheduler;
+import org.apache.ratis.util.TimeoutExecutor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -243,7 +243,7 @@ public class NettyClientStreamRpc implements DataStreamClientRpc {
 
   private final ConcurrentMap<ClientInvocationId, ReplyQueue> replies = new ConcurrentHashMap<>();
   private final TimeDuration replyQueueGracePeriod;
-  private final TimeoutScheduler timeoutScheduler = TimeoutScheduler.getInstance();
+  private final TimeoutExecutor timeoutScheduler = TimeoutExecutor.getInstance();
 
   public NettyClientStreamRpc(RaftPeer server, TlsConf tlsConf, RaftProperties properties) {
     this.name = JavaUtils.getClassSimpleName(getClass()) + "->" + server;
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingStepDown.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingStepDown.java
index 5b928a69e..b7bfde3f6 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingStepDown.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingStepDown.java
@@ -25,7 +25,7 @@ import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.MemoizedSupplier;
 import org.apache.ratis.util.Preconditions;
 import org.apache.ratis.util.TimeDuration;
-import org.apache.ratis.util.TimeoutScheduler;
+import org.apache.ratis.util.TimeoutExecutor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -81,7 +81,7 @@ public class PendingStepDown {
   }
 
   private final LeaderStateImpl leader;
-  private final TimeoutScheduler scheduler = TimeoutScheduler.getInstance();
+  private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();
   private final PendingRequestReference pending = new PendingRequestReference();
 
   PendingStepDown(LeaderStateImpl leaderState) {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotManagementRequestHandler.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotManagementRequestHandler.java
index 416e501ad..b899752e0 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotManagementRequestHandler.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotManagementRequestHandler.java
@@ -23,7 +23,7 @@ import org.apache.ratis.protocol.exceptions.TimeoutIOException;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.MemoizedSupplier;
 import org.apache.ratis.util.TimeDuration;
-import org.apache.ratis.util.TimeoutScheduler;
+import org.apache.ratis.util.TimeoutExecutor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -89,7 +89,7 @@ class SnapshotManagementRequestHandler {
   }
 
   private final RaftServerImpl server;
-  private final TimeoutScheduler scheduler = TimeoutScheduler.getInstance();
+  private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();
   private final PendingRequestReference pending = new PendingRequestReference();
 
   SnapshotManagementRequestHandler(RaftServerImpl server) {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/TransferLeadership.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/TransferLeadership.java
index 03b1dfc80..3aed1a10d 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/TransferLeadership.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/TransferLeadership.java
@@ -24,7 +24,7 @@ import org.apache.ratis.protocol.exceptions.TransferLeadershipException;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.MemoizedSupplier;
 import org.apache.ratis.util.TimeDuration;
-import org.apache.ratis.util.TimeoutScheduler;
+import org.apache.ratis.util.TimeoutExecutor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -74,7 +74,7 @@ public class TransferLeadership {
   }
 
   private final RaftServerImpl server;
-  private final TimeoutScheduler scheduler = TimeoutScheduler.getInstance();
+  private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();
   private final AtomicReference<PendingRequest> pending = new AtomicReference<>();
 
   TransferLeadership(RaftServerImpl server) {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/WatchRequests.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/WatchRequests.java
index 3b95d4bf1..f4c6200b9 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/WatchRequests.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/WatchRequests.java
@@ -169,7 +169,7 @@ class WatchRequests {
 
   private final TimeDuration watchTimeoutNanos;
   private final TimeDuration watchTimeoutDenominationNanos;
-  private final TimeoutScheduler scheduler = TimeoutScheduler.getInstance();
+  private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();
 
   WatchRequests(Object name, RaftProperties properties) {
     this.name = name + "-" + JavaUtils.getClassSimpleName(getClass());
diff --git a/ratis-test/src/test/java/org/apache/ratis/util/TestTimeoutScheduler.java b/ratis-test/src/test/java/org/apache/ratis/util/TestTimeoutScheduler.java
index dbe0e943f..cca1cfdea 100644
--- a/ratis-test/src/test/java/org/apache/ratis/util/TestTimeoutScheduler.java
+++ b/ratis-test/src/test/java/org/apache/ratis/util/TestTimeoutScheduler.java
@@ -223,7 +223,7 @@ public class TestTimeoutScheduler extends BaseTest {
     }
     HUNDRED_MILLIS.sleep();
     HUNDRED_MILLIS.sleep();
-    JavaUtils.attempt(() -> Assert.assertEquals(1, scheduler.getQueueSize()),
+    JavaUtils.attempt(() -> Assert.assertEquals(1, scheduler.getTaskCount()),
         10, HUNDRED_MILLIS, "only 1 shutdown task is scheduled", LOG);
 
     final TimeDuration oneMillis = TimeDuration.valueOf(1, TimeUnit.MILLISECONDS);
@@ -234,7 +234,7 @@ public class TestTimeoutScheduler extends BaseTest {
       oneMillis.sleep();
     }
     HUNDRED_MILLIS.sleep();
-    JavaUtils.attempt(() -> Assert.assertEquals(1, scheduler.getQueueSize()),
+    JavaUtils.attempt(() -> Assert.assertEquals(1, scheduler.getTaskCount()),
         10, HUNDRED_MILLIS, "only 1 shutdown task is scheduled", LOG);
 
     errorHandler.assertNoError();