You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2018/04/11 12:35:03 UTC

incubator-ratis git commit: RATIS-224. In RpcTimeout, it should shut down the scheduler if there is no tasks.

Repository: incubator-ratis
Updated Branches:
  refs/heads/master c692bf201 -> e37ab2ee1


RATIS-224. In RpcTimeout, it should shut down the scheduler if there is no tasks.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/e37ab2ee
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/e37ab2ee
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/e37ab2ee

Branch: refs/heads/master
Commit: e37ab2ee164fab84172513a72aa7175826b8dfd0
Parents: c692bf2
Author: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Authored: Wed Apr 11 20:34:18 2018 +0800
Committer: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Committed: Wed Apr 11 20:34:18 2018 +0800

----------------------------------------------------------------------
 .../java/org/apache/ratis/rpc/RpcTimeout.java   |  70 -------
 .../org/apache/ratis/util/TimeDuration.java     |   4 +
 .../org/apache/ratis/util/TimeoutScheduler.java | 123 +++++++++++
 .../grpc/client/RaftClientProtocolClient.java   |  13 +-
 .../ratis/grpc/server/GRpcLogAppender.java      |  14 +-
 .../apache/ratis/util/TestTimeoutScheduler.java | 210 +++++++++++++++++++
 6 files changed, 344 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e37ab2ee/ratis-common/src/main/java/org/apache/ratis/rpc/RpcTimeout.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/rpc/RpcTimeout.java b/ratis-common/src/main/java/org/apache/ratis/rpc/RpcTimeout.java
deleted file mode 100644
index 03d7eab..0000000
--- a/ratis-common/src/main/java/org/apache/ratis/rpc/RpcTimeout.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ratis.rpc;
-
-import org.apache.ratis.shaded.com.google.common.base.Supplier;
-import org.apache.ratis.util.Preconditions;
-import org.apache.ratis.util.TimeDuration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-public class RpcTimeout {
-  private static final Logger LOG = LoggerFactory.getLogger(RpcTimeout.class);
-  private ScheduledExecutorService timeoutScheduler = null;
-  private final TimeDuration callTimeout;
-  private int numUsers = 0;
-
-  public RpcTimeout(TimeDuration callTimeout) {
-    this.callTimeout = callTimeout;
-  }
-
-  public synchronized void addUser() {
-    if (timeoutScheduler == null) {
-      timeoutScheduler = Executors.newScheduledThreadPool(1);
-    }
-    numUsers++;
-  }
-
-  public synchronized void removeUser() {
-    numUsers--;
-    if (timeoutScheduler != null && numUsers == 0) {
-      timeoutScheduler.shutdown();
-      timeoutScheduler = null;
-    }
-  }
-
-  public synchronized void onTimeout(Runnable task, Supplier<String> errorMsg) {
-    Preconditions.assertTrue(timeoutScheduler != null);
-    TimeUnit unit = callTimeout.getUnit();
-    timeoutScheduler.schedule(() -> {
-      try {
-        task.run();
-      } catch (Throwable t) {
-        LOG.error(errorMsg.get(), t);
-      }
-    }, callTimeout.toInt(unit), unit);
-  }
-
-  public TimeDuration getCallTimeout() {
-    return callTimeout;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e37ab2ee/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java b/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java
index 8a3dc18..8a7c44a 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java
@@ -103,6 +103,10 @@ public class TimeDuration implements Comparable<TimeDuration> {
     this.unit = Objects.requireNonNull(unit, "unit = null");
   }
 
+  public long getDuration() {
+    return duration;
+  }
+
   public TimeUnit getUnit() {
     return unit;
   }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e37ab2ee/ratis-common/src/main/java/org/apache/ratis/util/TimeoutScheduler.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/TimeoutScheduler.java b/ratis-common/src/main/java/org/apache/ratis/util/TimeoutScheduler.java
new file mode 100644
index 0000000..7a7d16c
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/util/TimeoutScheduler.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+public final class TimeoutScheduler {
+  public static final Logger LOG = LoggerFactory.getLogger(TimeoutScheduler.class);
+
+  private static final TimeDuration DEFAULT_GRACE_PERIOD = TimeDuration.valueOf(1, TimeUnit.MINUTES);
+
+  private static final Supplier<TimeoutScheduler> INSTANCE = JavaUtils.memoize(TimeoutScheduler::new);
+
+  public static TimeoutScheduler getInstance() {
+    return INSTANCE.get();
+  }
+
+  private TimeoutScheduler() {}
+
+  /** When there is no tasks, the time period to wait before shutting down the scheduler. */
+  private final AtomicReference<TimeDuration> gracePeriod = new AtomicReference<>(DEFAULT_GRACE_PERIOD);
+
+  /** The number of scheduled tasks. */
+  private int numTasks = 0;
+  /** The scheduleID for each task */
+  private int scheduleID = 0;
+  private ScheduledExecutorService scheduler = null;
+
+  TimeDuration getGracePeriod() {
+    return gracePeriod.get();
+  }
+
+  void setGracePeriod(TimeDuration gracePeriod) {
+    this.gracePeriod.set(gracePeriod);
+  }
+
+  synchronized boolean hasScheduler() {
+    return scheduler != null;
+  }
+
+  /**
+   * Schedule a timeout task.
+   *
+   * @param timeout the timeout value.
+   * @param task the task to run when timeout.
+   * @param errorHandler to handle the error, if there is any.
+   */
+  public <THROWABLE extends Throwable> void onTimeout(
+      TimeDuration timeout, CheckedRunnable<THROWABLE> task, Consumer<THROWABLE> errorHandler) {
+    onTimeout(timeout, sid -> {
+      LOG.debug("run a task: sid {}", sid);
+      try {
+        task.run();
+      } catch(Throwable t) {
+        errorHandler.accept((THROWABLE) t);
+      } finally {
+        onTaskCompleted();
+      }
+    });
+  }
+
+  private synchronized void onTimeout(TimeDuration timeout, Consumer<Integer> toSchedule) {
+    if (scheduler == null) {
+      Preconditions.assertTrue(numTasks == 0);
+      LOG.debug("Initialize scheduler");
+      scheduler = Executors.newScheduledThreadPool(1);
+    }
+    numTasks++;
+    final int sid = scheduleID++;
+
+    LOG.debug("schedule a task: timeout {}, sid {}", timeout, sid);
+    scheduler.schedule(() -> toSchedule.accept(sid), timeout.getDuration(), timeout.getUnit());
+  }
+
+  private synchronized void onTaskCompleted() {
+    if (--numTasks == 0) {
+      final int sid = scheduleID;
+      final TimeDuration grace = getGracePeriod();
+      LOG.debug("Schedule a shutdown task: grace {}, sid {}", grace, sid);
+      scheduler.schedule(() -> tryShutdownScheduler(sid), grace.getDuration(), grace.getUnit());
+    }
+  }
+
+  private synchronized void tryShutdownScheduler(int sid) {
+    if (sid == scheduleID) {
+      // No new tasks submitted, shutdown the scheduler.
+      LOG.debug("shutdown scheduler: sid {}", sid);
+      scheduler.shutdown();
+      scheduler = null;
+    } else {
+      LOG.debug("shutdown cancelled: scheduleID has changed from {} to {}", sid, scheduleID);
+    }
+  }
+
+  /** When timeout, run the task.  Log the error, if there is any. */
+  public static <THROWABLE extends Throwable> void onTimeout(
+      TimeDuration timeout, CheckedRunnable<THROWABLE> task, Logger log, Supplier<String> errorMessage) {
+    getInstance().onTimeout(timeout, task, t -> log.error(errorMessage.get(), t));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e37ab2ee/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java
index e54cbbd..e90dad4 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java
@@ -20,7 +20,7 @@ package org.apache.ratis.grpc.client;
 import org.apache.ratis.client.impl.ClientProtoUtils;
 import org.apache.ratis.grpc.RaftGrpcUtil;
 import org.apache.ratis.protocol.*;
-import org.apache.ratis.rpc.RpcTimeout;
+import org.apache.ratis.util.TimeoutScheduler;
 import org.apache.ratis.shaded.io.grpc.ManagedChannel;
 import org.apache.ratis.shaded.io.grpc.StatusRuntimeException;
 import org.apache.ratis.shaded.io.grpc.netty.NettyChannelBuilder;
@@ -139,7 +139,6 @@ public class RaftClientProtocolClient implements Closeable {
   class AsyncStreamObservers implements Closeable {
     /** Request map: callId -> future */
     private final AtomicReference<Map<Long, CompletableFuture<RaftClientReply>>> replies = new AtomicReference<>(new ConcurrentHashMap<>());
-    private final RpcTimeout rpcTimeout = new RpcTimeout(timeout);
     private final StreamObserver<RaftClientReplyProto> replyStreamObserver = new StreamObserver<RaftClientReplyProto>() {
       @Override
       public void onNext(RaftClientReplyProto proto) {
@@ -170,10 +169,6 @@ public class RaftClientProtocolClient implements Closeable {
     };
     private final StreamObserver<RaftClientRequestProto> requestStreamObserver = append(replyStreamObserver);
 
-    private AsyncStreamObservers() {
-      rpcTimeout.addUser();
-    }
-
     CompletableFuture<RaftClientReply> onNext(RaftClientRequest request) {
       final Map<Long, CompletableFuture<RaftClientReply>> map = replies.get();
       if (map == null) {
@@ -184,7 +179,8 @@ public class RaftClientProtocolClient implements Closeable {
           () -> getName() + ":" + getClass().getSimpleName());
       try {
         requestStreamObserver.onNext(ClientProtoUtils.toRaftClientRequestProto(request));
-        rpcTimeout.onTimeout(() -> timeoutCheck(request), () -> "Timeout check failed for client request: " + request);
+        TimeoutScheduler.onTimeout(timeout, () -> timeoutCheck(request), LOG,
+            () -> "Timeout check failed for client request: " + request);
       } catch(Throwable t) {
         handleReplyFuture(request.getCallId(), future -> future.completeExceptionally(t));
       }
@@ -193,7 +189,7 @@ public class RaftClientProtocolClient implements Closeable {
 
     private void timeoutCheck(RaftClientRequest request) {
       handleReplyFuture(request.getCallId(), f -> f.completeExceptionally(
-          new IOException("Request timeout " + rpcTimeout.getCallTimeout() + ": " + request)));
+          new IOException("Request timeout " + timeout + ": " + request)));
     }
 
     private void handleReplyFuture(long callId, Consumer<CompletableFuture<RaftClientReply>> handler) {
@@ -206,7 +202,6 @@ public class RaftClientProtocolClient implements Closeable {
     public void close() {
       requestStreamObserver.onCompleted();
       completeReplyExceptionally(null, "close");
-      rpcTimeout.removeUser();
     }
 
     private void completeReplyExceptionally(Throwable t, String event) {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e37ab2ee/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java
index eb2db91..64d0b23 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java
@@ -20,7 +20,7 @@ package org.apache.ratis.grpc.server;
 import org.apache.ratis.grpc.GrpcConfigKeys;
 import org.apache.ratis.grpc.RaftGRpcService;
 import org.apache.ratis.grpc.RaftGrpcUtil;
-import org.apache.ratis.rpc.RpcTimeout;
+import org.apache.ratis.util.TimeoutScheduler;
 import org.apache.ratis.server.impl.FollowerInfo;
 import org.apache.ratis.server.impl.LeaderState;
 import org.apache.ratis.server.impl.LogAppender;
@@ -58,8 +58,7 @@ public class GRpcLogAppender extends LogAppender {
 
   private final AppendLogResponseHandler appendResponseHandler;
   private final InstallSnapshotResponseHandler snapshotResponseHandler;
-  private static RpcTimeout rpcTimeout = new RpcTimeout(
-      TimeDuration.valueOf(3, TimeUnit.SECONDS));
+  private static TimeDuration rpcTimeout = TimeDuration.valueOf(3, TimeUnit.SECONDS);
 
   private volatile StreamObserver<AppendEntriesRequestProto> appendLogRequestObserver;
   private StreamObserver<InstallSnapshotRequestProto> snapshotRequestObserver;
@@ -76,7 +75,6 @@ public class GRpcLogAppender extends LogAppender {
 
     appendResponseHandler = new AppendLogResponseHandler();
     snapshotResponseHandler = new InstallSnapshotResponseHandler();
-    rpcTimeout.addUser();
   }
 
   @Override
@@ -162,7 +160,7 @@ public class GRpcLogAppender extends LogAppender {
         server.getId(), null, request);
 
     s.onNext(request);
-    rpcTimeout.onTimeout(() -> timeoutAppendRequest(request),
+    TimeoutScheduler.onTimeout(rpcTimeout, () -> timeoutAppendRequest(request), LOG,
         () -> "Timeout check failed for append entry request: " + request);
     follower.updateLastRpcSendTime();
   }
@@ -328,12 +326,6 @@ public class GRpcLogAppender extends LogAppender {
     }
   }
 
-  @Override
-  public LogAppender stopSender() {
-    rpcTimeout.removeUser();
-    return super.stopSender();
-  }
-
   private class InstallSnapshotResponseHandler
       implements StreamObserver<InstallSnapshotReplyProto> {
     private final Queue<Integer> pending;

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e37ab2ee/ratis-server/src/test/java/org/apache/ratis/util/TestTimeoutScheduler.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/util/TestTimeoutScheduler.java b/ratis-server/src/test/java/org/apache/ratis/util/TestTimeoutScheduler.java
new file mode 100644
index 0000000..7c4ef4f
--- /dev/null
+++ b/ratis-server/src/test/java/org/apache/ratis/util/TestTimeoutScheduler.java
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util;
+
+import org.apache.log4j.Level;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+
+public class TestTimeoutScheduler {
+  {
+    LogUtils.setLogLevel(TimeoutScheduler.LOG, Level.ALL);
+  }
+
+  static class ErrorHandler implements Consumer<RuntimeException> {
+    private final AtomicBoolean hasError = new AtomicBoolean(false);
+
+    @Override
+    public void accept(RuntimeException e) {
+      hasError.set(true);
+      TimeoutScheduler.LOG.error("Failed", e);
+    }
+
+    void assertNoError() {
+      Assert.assertFalse(hasError.get());
+    }
+  }
+
+  @Test(timeout = 1000)
+  public void testSingleTask() throws Exception {
+    final TimeoutScheduler scheduler = TimeoutScheduler.getInstance();
+    final TimeDuration grace = TimeDuration.valueOf(100, TimeUnit.MILLISECONDS);
+    scheduler.setGracePeriod(grace);
+    Assert.assertFalse(scheduler.hasScheduler());
+
+    final ErrorHandler errorHandler = new ErrorHandler();
+
+    final AtomicBoolean fired = new AtomicBoolean(false);
+    scheduler.onTimeout(TimeDuration.valueOf(250, TimeUnit.MILLISECONDS), () -> {
+      Assert.assertFalse(fired.get());
+      fired.set(true);
+    }, errorHandler);
+    Assert.assertTrue(scheduler.hasScheduler());
+
+    Thread.sleep(100);
+    Assert.assertFalse(fired.get());
+    Assert.assertTrue(scheduler.hasScheduler());
+
+    Thread.sleep(100);
+    Assert.assertFalse(fired.get());
+    Assert.assertTrue(scheduler.hasScheduler());
+
+    Thread.sleep(100);
+    Assert.assertTrue(fired.get());
+    Assert.assertTrue(scheduler.hasScheduler());
+
+    Thread.sleep(100);
+    Assert.assertTrue(fired.get());
+    Assert.assertFalse(scheduler.hasScheduler());
+
+    errorHandler.assertNoError();
+  }
+
+  @Test(timeout = 1000)
+  public void testMultipleTasks() throws Exception {
+    final TimeoutScheduler scheduler = TimeoutScheduler.getInstance();
+    final TimeDuration grace = TimeDuration.valueOf(100, TimeUnit.MILLISECONDS);
+    scheduler.setGracePeriod(grace);
+    Assert.assertFalse(scheduler.hasScheduler());
+
+    final ErrorHandler errorHandler = new ErrorHandler();
+
+    final AtomicBoolean[] fired = new AtomicBoolean[3];
+    for(int i = 0; i < fired.length; i++) {
+      final AtomicBoolean f = fired[i] = new AtomicBoolean(false);
+      scheduler.onTimeout(TimeDuration.valueOf(100*i + 50, TimeUnit.MILLISECONDS), () -> {
+        Assert.assertFalse(f.get());
+        f.set(true);
+      }, errorHandler);
+      Assert.assertTrue(scheduler.hasScheduler());
+    }
+
+    Thread.sleep(100);
+    Assert.assertTrue(fired[0].get());
+    Assert.assertFalse(fired[1].get());
+    Assert.assertFalse(fired[2].get());
+    Assert.assertTrue(scheduler.hasScheduler());
+
+    Thread.sleep(100);
+    Assert.assertTrue(fired[0].get());
+    Assert.assertTrue(fired[1].get());
+    Assert.assertFalse(fired[2].get());
+    Assert.assertTrue(scheduler.hasScheduler());
+
+    Thread.sleep(100);
+    Assert.assertTrue(fired[0].get());
+    Assert.assertTrue(fired[1].get());
+    Assert.assertTrue(fired[2].get());
+    Assert.assertTrue(scheduler.hasScheduler());
+
+    Thread.sleep(100);
+    Assert.assertTrue(fired[0].get());
+    Assert.assertTrue(fired[1].get());
+    Assert.assertTrue(fired[2].get());
+    Assert.assertFalse(scheduler.hasScheduler());
+
+    errorHandler.assertNoError();
+  }
+
+  @Test(timeout = 1000)
+  public void testExtendingGracePeriod() throws Exception {
+    final TimeoutScheduler scheduler = TimeoutScheduler.getInstance();
+    final TimeDuration grace = TimeDuration.valueOf(100, TimeUnit.MILLISECONDS);
+    scheduler.setGracePeriod(grace);
+    Assert.assertFalse(scheduler.hasScheduler());
+
+    final ErrorHandler errorHandler = new ErrorHandler();
+
+    {
+      final AtomicBoolean fired = new AtomicBoolean(false);
+      scheduler.onTimeout(TimeDuration.valueOf(150, TimeUnit.MILLISECONDS), () -> {
+        Assert.assertFalse(fired.get());
+        fired.set(true);
+      }, errorHandler);
+      Assert.assertTrue(scheduler.hasScheduler());
+
+      Thread.sleep(100);
+      Assert.assertFalse(fired.get());
+      Assert.assertTrue(scheduler.hasScheduler());
+
+      Thread.sleep(100);
+      Assert.assertTrue(fired.get());
+      Assert.assertTrue(scheduler.hasScheduler());
+    }
+
+    {
+      // submit another task during grace period
+      final AtomicBoolean fired2 = new AtomicBoolean(false);
+      scheduler.onTimeout(TimeDuration.valueOf(150, TimeUnit.MILLISECONDS), () -> {
+        Assert.assertFalse(fired2.get());
+        fired2.set(true);
+      }, errorHandler);
+
+      Thread.sleep(100);
+      Assert.assertFalse(fired2.get());
+      Assert.assertTrue(scheduler.hasScheduler());
+
+      Thread.sleep(100);
+      Assert.assertTrue(fired2.get());
+      Assert.assertTrue(scheduler.hasScheduler());
+
+      Thread.sleep(100);
+      Assert.assertTrue(fired2.get());
+      Assert.assertFalse(scheduler.hasScheduler());
+    }
+
+    errorHandler.assertNoError();
+  }
+
+  @Test(timeout = 1000)
+  public void testRestartingScheduler() throws Exception {
+    final TimeoutScheduler scheduler = TimeoutScheduler.getInstance();
+    final TimeDuration grace = TimeDuration.valueOf(100, TimeUnit.MILLISECONDS);
+    scheduler.setGracePeriod(grace);
+    Assert.assertFalse(scheduler.hasScheduler());
+
+    final ErrorHandler errorHandler = new ErrorHandler();
+
+    for(int i = 0; i < 2; i++) {
+      final AtomicBoolean fired = new AtomicBoolean(false);
+      scheduler.onTimeout(TimeDuration.valueOf(150, TimeUnit.MILLISECONDS), () -> {
+        Assert.assertFalse(fired.get());
+        fired.set(true);
+      }, errorHandler);
+      Assert.assertTrue(scheduler.hasScheduler());
+
+      Thread.sleep(100);
+      Assert.assertFalse(fired.get());
+      Assert.assertTrue(scheduler.hasScheduler());
+
+      Thread.sleep(100);
+      Assert.assertTrue(fired.get());
+      Assert.assertTrue(scheduler.hasScheduler());
+
+      Thread.sleep(100);
+      Assert.assertTrue(fired.get());
+      Assert.assertFalse(scheduler.hasScheduler());
+    }
+
+    errorHandler.assertNoError();
+  }
+}