You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2018/04/11 12:35:03 UTC
incubator-ratis git commit: RATIS-224. In RpcTimeout,
it should shut down the scheduler if there is no tasks.
Repository: incubator-ratis
Updated Branches:
refs/heads/master c692bf201 -> e37ab2ee1
RATIS-224. In RpcTimeout, it should shut down the scheduler if there is no tasks.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/e37ab2ee
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/e37ab2ee
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/e37ab2ee
Branch: refs/heads/master
Commit: e37ab2ee164fab84172513a72aa7175826b8dfd0
Parents: c692bf2
Author: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Authored: Wed Apr 11 20:34:18 2018 +0800
Committer: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Committed: Wed Apr 11 20:34:18 2018 +0800
----------------------------------------------------------------------
.../java/org/apache/ratis/rpc/RpcTimeout.java | 70 -------
.../org/apache/ratis/util/TimeDuration.java | 4 +
.../org/apache/ratis/util/TimeoutScheduler.java | 123 +++++++++++
.../grpc/client/RaftClientProtocolClient.java | 13 +-
.../ratis/grpc/server/GRpcLogAppender.java | 14 +-
.../apache/ratis/util/TestTimeoutScheduler.java | 210 +++++++++++++++++++
6 files changed, 344 insertions(+), 90 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e37ab2ee/ratis-common/src/main/java/org/apache/ratis/rpc/RpcTimeout.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/rpc/RpcTimeout.java b/ratis-common/src/main/java/org/apache/ratis/rpc/RpcTimeout.java
deleted file mode 100644
index 03d7eab..0000000
--- a/ratis-common/src/main/java/org/apache/ratis/rpc/RpcTimeout.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ratis.rpc;
-
-import org.apache.ratis.shaded.com.google.common.base.Supplier;
-import org.apache.ratis.util.Preconditions;
-import org.apache.ratis.util.TimeDuration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-public class RpcTimeout {
- private static final Logger LOG = LoggerFactory.getLogger(RpcTimeout.class);
- private ScheduledExecutorService timeoutScheduler = null;
- private final TimeDuration callTimeout;
- private int numUsers = 0;
-
- public RpcTimeout(TimeDuration callTimeout) {
- this.callTimeout = callTimeout;
- }
-
- public synchronized void addUser() {
- if (timeoutScheduler == null) {
- timeoutScheduler = Executors.newScheduledThreadPool(1);
- }
- numUsers++;
- }
-
- public synchronized void removeUser() {
- numUsers--;
- if (timeoutScheduler != null && numUsers == 0) {
- timeoutScheduler.shutdown();
- timeoutScheduler = null;
- }
- }
-
- public synchronized void onTimeout(Runnable task, Supplier<String> errorMsg) {
- Preconditions.assertTrue(timeoutScheduler != null);
- TimeUnit unit = callTimeout.getUnit();
- timeoutScheduler.schedule(() -> {
- try {
- task.run();
- } catch (Throwable t) {
- LOG.error(errorMsg.get(), t);
- }
- }, callTimeout.toInt(unit), unit);
- }
-
- public TimeDuration getCallTimeout() {
- return callTimeout;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e37ab2ee/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java b/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java
index 8a3dc18..8a7c44a 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java
@@ -103,6 +103,10 @@ public class TimeDuration implements Comparable<TimeDuration> {
this.unit = Objects.requireNonNull(unit, "unit = null");
}
+ public long getDuration() {
+ return duration;
+ }
+
public TimeUnit getUnit() {
return unit;
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e37ab2ee/ratis-common/src/main/java/org/apache/ratis/util/TimeoutScheduler.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/TimeoutScheduler.java b/ratis-common/src/main/java/org/apache/ratis/util/TimeoutScheduler.java
new file mode 100644
index 0000000..7a7d16c
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/util/TimeoutScheduler.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+public final class TimeoutScheduler {
+ public static final Logger LOG = LoggerFactory.getLogger(TimeoutScheduler.class);
+
+ private static final TimeDuration DEFAULT_GRACE_PERIOD = TimeDuration.valueOf(1, TimeUnit.MINUTES);
+
+ private static final Supplier<TimeoutScheduler> INSTANCE = JavaUtils.memoize(TimeoutScheduler::new);
+
+ public static TimeoutScheduler getInstance() {
+ return INSTANCE.get();
+ }
+
+ private TimeoutScheduler() {}
+
+ /** When there is no tasks, the time period to wait before shutting down the scheduler. */
+ private final AtomicReference<TimeDuration> gracePeriod = new AtomicReference<>(DEFAULT_GRACE_PERIOD);
+
+ /** The number of scheduled tasks. */
+ private int numTasks = 0;
+ /** The scheduleID for each task */
+ private int scheduleID = 0;
+ private ScheduledExecutorService scheduler = null;
+
+ TimeDuration getGracePeriod() {
+ return gracePeriod.get();
+ }
+
+ void setGracePeriod(TimeDuration gracePeriod) {
+ this.gracePeriod.set(gracePeriod);
+ }
+
+ synchronized boolean hasScheduler() {
+ return scheduler != null;
+ }
+
+ /**
+ * Schedule a timeout task.
+ *
+ * @param timeout the timeout value.
+ * @param task the task to run when timeout.
+ * @param errorHandler to handle the error, if there is any.
+ */
+ public <THROWABLE extends Throwable> void onTimeout(
+ TimeDuration timeout, CheckedRunnable<THROWABLE> task, Consumer<THROWABLE> errorHandler) {
+ onTimeout(timeout, sid -> {
+ LOG.debug("run a task: sid {}", sid);
+ try {
+ task.run();
+ } catch(Throwable t) {
+ errorHandler.accept((THROWABLE) t);
+ } finally {
+ onTaskCompleted();
+ }
+ });
+ }
+
+ private synchronized void onTimeout(TimeDuration timeout, Consumer<Integer> toSchedule) {
+ if (scheduler == null) {
+ Preconditions.assertTrue(numTasks == 0);
+ LOG.debug("Initialize scheduler");
+ scheduler = Executors.newScheduledThreadPool(1);
+ }
+ numTasks++;
+ final int sid = scheduleID++;
+
+ LOG.debug("schedule a task: timeout {}, sid {}", timeout, sid);
+ scheduler.schedule(() -> toSchedule.accept(sid), timeout.getDuration(), timeout.getUnit());
+ }
+
+ private synchronized void onTaskCompleted() {
+ if (--numTasks == 0) {
+ final int sid = scheduleID;
+ final TimeDuration grace = getGracePeriod();
+ LOG.debug("Schedule a shutdown task: grace {}, sid {}", grace, sid);
+ scheduler.schedule(() -> tryShutdownScheduler(sid), grace.getDuration(), grace.getUnit());
+ }
+ }
+
+ private synchronized void tryShutdownScheduler(int sid) {
+ if (sid == scheduleID) {
+ // No new tasks submitted, shutdown the scheduler.
+ LOG.debug("shutdown scheduler: sid {}", sid);
+ scheduler.shutdown();
+ scheduler = null;
+ } else {
+ LOG.debug("shutdown cancelled: scheduleID has changed from {} to {}", sid, scheduleID);
+ }
+ }
+
+ /** When timeout, run the task. Log the error, if there is any. */
+ public static <THROWABLE extends Throwable> void onTimeout(
+ TimeDuration timeout, CheckedRunnable<THROWABLE> task, Logger log, Supplier<String> errorMessage) {
+ getInstance().onTimeout(timeout, task, t -> log.error(errorMessage.get(), t));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e37ab2ee/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java
index e54cbbd..e90dad4 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java
@@ -20,7 +20,7 @@ package org.apache.ratis.grpc.client;
import org.apache.ratis.client.impl.ClientProtoUtils;
import org.apache.ratis.grpc.RaftGrpcUtil;
import org.apache.ratis.protocol.*;
-import org.apache.ratis.rpc.RpcTimeout;
+import org.apache.ratis.util.TimeoutScheduler;
import org.apache.ratis.shaded.io.grpc.ManagedChannel;
import org.apache.ratis.shaded.io.grpc.StatusRuntimeException;
import org.apache.ratis.shaded.io.grpc.netty.NettyChannelBuilder;
@@ -139,7 +139,6 @@ public class RaftClientProtocolClient implements Closeable {
class AsyncStreamObservers implements Closeable {
/** Request map: callId -> future */
private final AtomicReference<Map<Long, CompletableFuture<RaftClientReply>>> replies = new AtomicReference<>(new ConcurrentHashMap<>());
- private final RpcTimeout rpcTimeout = new RpcTimeout(timeout);
private final StreamObserver<RaftClientReplyProto> replyStreamObserver = new StreamObserver<RaftClientReplyProto>() {
@Override
public void onNext(RaftClientReplyProto proto) {
@@ -170,10 +169,6 @@ public class RaftClientProtocolClient implements Closeable {
};
private final StreamObserver<RaftClientRequestProto> requestStreamObserver = append(replyStreamObserver);
- private AsyncStreamObservers() {
- rpcTimeout.addUser();
- }
-
CompletableFuture<RaftClientReply> onNext(RaftClientRequest request) {
final Map<Long, CompletableFuture<RaftClientReply>> map = replies.get();
if (map == null) {
@@ -184,7 +179,8 @@ public class RaftClientProtocolClient implements Closeable {
() -> getName() + ":" + getClass().getSimpleName());
try {
requestStreamObserver.onNext(ClientProtoUtils.toRaftClientRequestProto(request));
- rpcTimeout.onTimeout(() -> timeoutCheck(request), () -> "Timeout check failed for client request: " + request);
+ TimeoutScheduler.onTimeout(timeout, () -> timeoutCheck(request), LOG,
+ () -> "Timeout check failed for client request: " + request);
} catch(Throwable t) {
handleReplyFuture(request.getCallId(), future -> future.completeExceptionally(t));
}
@@ -193,7 +189,7 @@ public class RaftClientProtocolClient implements Closeable {
private void timeoutCheck(RaftClientRequest request) {
handleReplyFuture(request.getCallId(), f -> f.completeExceptionally(
- new IOException("Request timeout " + rpcTimeout.getCallTimeout() + ": " + request)));
+ new IOException("Request timeout " + timeout + ": " + request)));
}
private void handleReplyFuture(long callId, Consumer<CompletableFuture<RaftClientReply>> handler) {
@@ -206,7 +202,6 @@ public class RaftClientProtocolClient implements Closeable {
public void close() {
requestStreamObserver.onCompleted();
completeReplyExceptionally(null, "close");
- rpcTimeout.removeUser();
}
private void completeReplyExceptionally(Throwable t, String event) {
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e37ab2ee/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java
index eb2db91..64d0b23 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java
@@ -20,7 +20,7 @@ package org.apache.ratis.grpc.server;
import org.apache.ratis.grpc.GrpcConfigKeys;
import org.apache.ratis.grpc.RaftGRpcService;
import org.apache.ratis.grpc.RaftGrpcUtil;
-import org.apache.ratis.rpc.RpcTimeout;
+import org.apache.ratis.util.TimeoutScheduler;
import org.apache.ratis.server.impl.FollowerInfo;
import org.apache.ratis.server.impl.LeaderState;
import org.apache.ratis.server.impl.LogAppender;
@@ -58,8 +58,7 @@ public class GRpcLogAppender extends LogAppender {
private final AppendLogResponseHandler appendResponseHandler;
private final InstallSnapshotResponseHandler snapshotResponseHandler;
- private static RpcTimeout rpcTimeout = new RpcTimeout(
- TimeDuration.valueOf(3, TimeUnit.SECONDS));
+ private static TimeDuration rpcTimeout = TimeDuration.valueOf(3, TimeUnit.SECONDS);
private volatile StreamObserver<AppendEntriesRequestProto> appendLogRequestObserver;
private StreamObserver<InstallSnapshotRequestProto> snapshotRequestObserver;
@@ -76,7 +75,6 @@ public class GRpcLogAppender extends LogAppender {
appendResponseHandler = new AppendLogResponseHandler();
snapshotResponseHandler = new InstallSnapshotResponseHandler();
- rpcTimeout.addUser();
}
@Override
@@ -162,7 +160,7 @@ public class GRpcLogAppender extends LogAppender {
server.getId(), null, request);
s.onNext(request);
- rpcTimeout.onTimeout(() -> timeoutAppendRequest(request),
+ TimeoutScheduler.onTimeout(rpcTimeout, () -> timeoutAppendRequest(request), LOG,
() -> "Timeout check failed for append entry request: " + request);
follower.updateLastRpcSendTime();
}
@@ -328,12 +326,6 @@ public class GRpcLogAppender extends LogAppender {
}
}
- @Override
- public LogAppender stopSender() {
- rpcTimeout.removeUser();
- return super.stopSender();
- }
-
private class InstallSnapshotResponseHandler
implements StreamObserver<InstallSnapshotReplyProto> {
private final Queue<Integer> pending;
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e37ab2ee/ratis-server/src/test/java/org/apache/ratis/util/TestTimeoutScheduler.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/util/TestTimeoutScheduler.java b/ratis-server/src/test/java/org/apache/ratis/util/TestTimeoutScheduler.java
new file mode 100644
index 0000000..7c4ef4f
--- /dev/null
+++ b/ratis-server/src/test/java/org/apache/ratis/util/TestTimeoutScheduler.java
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util;
+
+import org.apache.log4j.Level;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+
+public class TestTimeoutScheduler {
+ {
+ LogUtils.setLogLevel(TimeoutScheduler.LOG, Level.ALL);
+ }
+
+ static class ErrorHandler implements Consumer<RuntimeException> {
+ private final AtomicBoolean hasError = new AtomicBoolean(false);
+
+ @Override
+ public void accept(RuntimeException e) {
+ hasError.set(true);
+ TimeoutScheduler.LOG.error("Failed", e);
+ }
+
+ void assertNoError() {
+ Assert.assertFalse(hasError.get());
+ }
+ }
+
+ @Test(timeout = 1000)
+ public void testSingleTask() throws Exception {
+ final TimeoutScheduler scheduler = TimeoutScheduler.getInstance();
+ final TimeDuration grace = TimeDuration.valueOf(100, TimeUnit.MILLISECONDS);
+ scheduler.setGracePeriod(grace);
+ Assert.assertFalse(scheduler.hasScheduler());
+
+ final ErrorHandler errorHandler = new ErrorHandler();
+
+ final AtomicBoolean fired = new AtomicBoolean(false);
+ scheduler.onTimeout(TimeDuration.valueOf(250, TimeUnit.MILLISECONDS), () -> {
+ Assert.assertFalse(fired.get());
+ fired.set(true);
+ }, errorHandler);
+ Assert.assertTrue(scheduler.hasScheduler());
+
+ Thread.sleep(100);
+ Assert.assertFalse(fired.get());
+ Assert.assertTrue(scheduler.hasScheduler());
+
+ Thread.sleep(100);
+ Assert.assertFalse(fired.get());
+ Assert.assertTrue(scheduler.hasScheduler());
+
+ Thread.sleep(100);
+ Assert.assertTrue(fired.get());
+ Assert.assertTrue(scheduler.hasScheduler());
+
+ Thread.sleep(100);
+ Assert.assertTrue(fired.get());
+ Assert.assertFalse(scheduler.hasScheduler());
+
+ errorHandler.assertNoError();
+ }
+
+ @Test(timeout = 1000)
+ public void testMultipleTasks() throws Exception {
+ final TimeoutScheduler scheduler = TimeoutScheduler.getInstance();
+ final TimeDuration grace = TimeDuration.valueOf(100, TimeUnit.MILLISECONDS);
+ scheduler.setGracePeriod(grace);
+ Assert.assertFalse(scheduler.hasScheduler());
+
+ final ErrorHandler errorHandler = new ErrorHandler();
+
+ final AtomicBoolean[] fired = new AtomicBoolean[3];
+ for(int i = 0; i < fired.length; i++) {
+ final AtomicBoolean f = fired[i] = new AtomicBoolean(false);
+ scheduler.onTimeout(TimeDuration.valueOf(100*i + 50, TimeUnit.MILLISECONDS), () -> {
+ Assert.assertFalse(f.get());
+ f.set(true);
+ }, errorHandler);
+ Assert.assertTrue(scheduler.hasScheduler());
+ }
+
+ Thread.sleep(100);
+ Assert.assertTrue(fired[0].get());
+ Assert.assertFalse(fired[1].get());
+ Assert.assertFalse(fired[2].get());
+ Assert.assertTrue(scheduler.hasScheduler());
+
+ Thread.sleep(100);
+ Assert.assertTrue(fired[0].get());
+ Assert.assertTrue(fired[1].get());
+ Assert.assertFalse(fired[2].get());
+ Assert.assertTrue(scheduler.hasScheduler());
+
+ Thread.sleep(100);
+ Assert.assertTrue(fired[0].get());
+ Assert.assertTrue(fired[1].get());
+ Assert.assertTrue(fired[2].get());
+ Assert.assertTrue(scheduler.hasScheduler());
+
+ Thread.sleep(100);
+ Assert.assertTrue(fired[0].get());
+ Assert.assertTrue(fired[1].get());
+ Assert.assertTrue(fired[2].get());
+ Assert.assertFalse(scheduler.hasScheduler());
+
+ errorHandler.assertNoError();
+ }
+
+ @Test(timeout = 1000)
+ public void testExtendingGracePeriod() throws Exception {
+ final TimeoutScheduler scheduler = TimeoutScheduler.getInstance();
+ final TimeDuration grace = TimeDuration.valueOf(100, TimeUnit.MILLISECONDS);
+ scheduler.setGracePeriod(grace);
+ Assert.assertFalse(scheduler.hasScheduler());
+
+ final ErrorHandler errorHandler = new ErrorHandler();
+
+ {
+ final AtomicBoolean fired = new AtomicBoolean(false);
+ scheduler.onTimeout(TimeDuration.valueOf(150, TimeUnit.MILLISECONDS), () -> {
+ Assert.assertFalse(fired.get());
+ fired.set(true);
+ }, errorHandler);
+ Assert.assertTrue(scheduler.hasScheduler());
+
+ Thread.sleep(100);
+ Assert.assertFalse(fired.get());
+ Assert.assertTrue(scheduler.hasScheduler());
+
+ Thread.sleep(100);
+ Assert.assertTrue(fired.get());
+ Assert.assertTrue(scheduler.hasScheduler());
+ }
+
+ {
+ // submit another task during grace period
+ final AtomicBoolean fired2 = new AtomicBoolean(false);
+ scheduler.onTimeout(TimeDuration.valueOf(150, TimeUnit.MILLISECONDS), () -> {
+ Assert.assertFalse(fired2.get());
+ fired2.set(true);
+ }, errorHandler);
+
+ Thread.sleep(100);
+ Assert.assertFalse(fired2.get());
+ Assert.assertTrue(scheduler.hasScheduler());
+
+ Thread.sleep(100);
+ Assert.assertTrue(fired2.get());
+ Assert.assertTrue(scheduler.hasScheduler());
+
+ Thread.sleep(100);
+ Assert.assertTrue(fired2.get());
+ Assert.assertFalse(scheduler.hasScheduler());
+ }
+
+ errorHandler.assertNoError();
+ }
+
+ @Test(timeout = 1000)
+ public void testRestartingScheduler() throws Exception {
+ final TimeoutScheduler scheduler = TimeoutScheduler.getInstance();
+ final TimeDuration grace = TimeDuration.valueOf(100, TimeUnit.MILLISECONDS);
+ scheduler.setGracePeriod(grace);
+ Assert.assertFalse(scheduler.hasScheduler());
+
+ final ErrorHandler errorHandler = new ErrorHandler();
+
+ for(int i = 0; i < 2; i++) {
+ final AtomicBoolean fired = new AtomicBoolean(false);
+ scheduler.onTimeout(TimeDuration.valueOf(150, TimeUnit.MILLISECONDS), () -> {
+ Assert.assertFalse(fired.get());
+ fired.set(true);
+ }, errorHandler);
+ Assert.assertTrue(scheduler.hasScheduler());
+
+ Thread.sleep(100);
+ Assert.assertFalse(fired.get());
+ Assert.assertTrue(scheduler.hasScheduler());
+
+ Thread.sleep(100);
+ Assert.assertTrue(fired.get());
+ Assert.assertTrue(scheduler.hasScheduler());
+
+ Thread.sleep(100);
+ Assert.assertTrue(fired.get());
+ Assert.assertFalse(scheduler.hasScheduler());
+ }
+
+ errorHandler.assertNoError();
+ }
+}