You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2019/04/09 16:34:38 UTC

[geode] branch develop updated: GEODE-6603: Create StoppableCountDownLatch unit tests (#3413)

This is an automated email from the ASF dual-hosted git repository.

klund pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new f7ae070  GEODE-6603: Create StoppableCountDownLatch unit tests (#3413)
f7ae070 is described below

commit f7ae0704415e9fcc26b2d24d4dd1e0dcb5f8d06d
Author: Kirk Lund <kl...@apache.org>
AuthorDate: Tue Apr 9 09:34:27 2019 -0700

    GEODE-6603: Create StoppableCountDownLatch unit tests (#3413)
    
    * Create StoppableCountDownLatchTest and fixup any issues uncovered by
    testing.
    * Remove unused class StoppableCountDownOrUpLatch.
    * Improve ExecutorServiceRule for StoppableCountDownLatchTest.
---
 .../cache/BucketCreationCrashRegressionTest.java   |   4 +-
 .../util/concurrent/StoppableCountDownLatch.java   | 100 +++++---
 .../concurrent/StoppableCountDownOrUpLatch.java    | 210 ---------------
 .../concurrent/StoppableCountDownLatchTest.java    | 283 +++++++++++++++++++++
 .../test/junit/rules/ExecutorServiceRule.java      |  44 +++-
 5 files changed, 387 insertions(+), 254 deletions(-)

diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/BucketCreationCrashRegressionTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/BucketCreationCrashRegressionTest.java
index 9f3316a..3c97f97 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/BucketCreationCrashRegressionTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/BucketCreationCrashRegressionTest.java
@@ -18,6 +18,7 @@ import static org.apache.geode.distributed.ConfigurationProperties.ENABLE_CLUSTE
 import static org.apache.geode.distributed.ConfigurationProperties.ENABLE_NETWORK_PARTITION_DETECTION;
 import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
 import static org.apache.geode.distributed.ConfigurationProperties.USE_CLUSTER_CONFIGURATION;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
 import static org.apache.geode.test.dunit.DistributedTestUtils.crashDistributedSystem;
 import static org.apache.geode.test.dunit.IgnoredException.addIgnoredException;
 import static org.apache.geode.test.dunit.Invoke.invokeInEveryVM;
@@ -172,7 +173,8 @@ public class BucketCreationCrashRegressionTest implements Serializable {
         .isInstanceOf(RMIException.class)
         .hasCauseInstanceOf(DistributedSystemDisconnectedException.class);
 
-    assertThat(server2.invoke(() -> getBucketList())).containsExactly(3);
+    await()
+        .untilAsserted(() -> assertThat(server2.invoke(() -> getBucketList())).containsExactly(3));
 
     // This shouldn't hang, because the bucket creation should finish.
     server2.invoke(() -> putData(3, 4, "a"));
diff --git a/geode-core/src/main/java/org/apache/geode/internal/util/concurrent/StoppableCountDownLatch.java b/geode-core/src/main/java/org/apache/geode/internal/util/concurrent/StoppableCountDownLatch.java
index 6336404..6c6576a 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/util/concurrent/StoppableCountDownLatch.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/util/concurrent/StoppableCountDownLatch.java
@@ -14,11 +14,14 @@
  */
 package org.apache.geode.internal.util.concurrent;
 
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static org.apache.geode.distributed.internal.DistributionConfig.GEMFIRE_PREFIX;
+
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.geode.CancelCriterion;
-import org.apache.geode.distributed.internal.DistributionConfig;
 import org.apache.geode.internal.Assert;
 
 /**
@@ -26,75 +29,100 @@ import org.apache.geode.internal.Assert;
  */
 public class StoppableCountDownLatch {
 
-  /**
-   * This is how often waiters will wake up to check for cancellation
-   */
-  static final long RETRY_TIME = Long
-      .getLong(DistributionConfig.GEMFIRE_PREFIX + "stoppable-retry-interval", 2000).longValue();
+  static final String RETRY_TIME_MILLIS_PROPERTY = GEMFIRE_PREFIX + "stoppable-retry-interval";
+  static final long RETRY_TIME_MILLIS_DEFAULT = 2000;
 
+  private final CountDownLatch delegate;
 
-  /**
-   * The underlying latch
-   */
-  private final CountDownLatch latch;
+  private final CancelCriterion stopper;
 
   /**
-   * The cancellation criterion
+   * This is how often waiters will wake up to check for cancellation
    */
-  private final CancelCriterion stopper;
+  private final long retryIntervalNanos;
+
+  private final NanoTimer nanoTimer;
 
   /**
+   * @param stopper the CancelCriterion to check before awaiting
    * @param count the number of times {@link #countDown} must be invoked before threads can pass
    *        through {@link #await()}
    *
    * @throws IllegalArgumentException if {@code count} is negative
    */
-  public StoppableCountDownLatch(CancelCriterion stopper, int count) {
+  public StoppableCountDownLatch(final CancelCriterion stopper, final int count) {
+    this(stopper, count,
+        MILLISECONDS.toNanos(Long.getLong(RETRY_TIME_MILLIS_PROPERTY, RETRY_TIME_MILLIS_DEFAULT)),
+        System::nanoTime);
+  }
+
+  StoppableCountDownLatch(final CancelCriterion stopper, final int count,
+      final long retryIntervalNanos, final NanoTimer nanoTimer) {
     Assert.assertTrue(stopper != null);
-    this.latch = new CountDownLatch(count);
+    delegate = new CountDownLatch(count);
     this.stopper = stopper;
+    this.retryIntervalNanos = retryIntervalNanos;
+    this.nanoTimer = nanoTimer;
   }
 
   public void await() throws InterruptedException {
-    for (;;) {
+    do {
       stopper.checkCancelInProgress(null);
-      if (latch.await(RETRY_TIME, TimeUnit.MILLISECONDS)) {
-        break;
-      }
+    } while (!delegate.await(retryIntervalNanos, NANOSECONDS));
+  }
+
+  public boolean await(final long timeout, final TimeUnit unit) throws InterruptedException {
+    stopper.checkCancelInProgress(null);
+    long timeoutNanos = unit.toNanos(timeout);
+    if (timeoutNanos > retryIntervalNanos) {
+      return awaitWithCheck(timeoutNanos);
     }
+    return delegate.await(timeoutNanos, NANOSECONDS);
   }
 
   /**
-   * @param msTimeout how long to wait in milliseconds
+   * @param timeoutMillis how long to wait in milliseconds
    *
    * @return true if it was unlatched
    */
-  public boolean await(long msTimeout) throws InterruptedException {
+  public boolean await(final long timeoutMillis) throws InterruptedException {
     stopper.checkCancelInProgress(null);
-    return latch.await(msTimeout, TimeUnit.MILLISECONDS);
-  }
-
-  public boolean await(final long timeout, final TimeUnit unit) throws InterruptedException {
-    stopper.checkCancelInProgress(null);
-    return latch.await(timeout, unit);
+    long timeoutNanos = MILLISECONDS.toNanos(timeoutMillis);
+    if (timeoutNanos > retryIntervalNanos) {
+      return awaitWithCheck(timeoutNanos);
+    }
+    return delegate.await(timeoutNanos, NANOSECONDS);
   }
 
-  public synchronized void countDown() {
-    latch.countDown();
+  public void countDown() {
+    delegate.countDown();
   }
 
-  /**
-   * @return the current count
-   */
   public long getCount() {
-    return latch.getCount();
+    return delegate.getCount();
   }
 
-  /**
-   * @return a string identifying this latch, as well as its state
-   */
   @Override
   public String toString() {
-    return "(Stoppable) " + latch.toString();
+    return "(Stoppable) " + delegate;
+  }
+
+  long retryIntervalNanos() {
+    return retryIntervalNanos;
+  }
+
+  private boolean awaitWithCheck(final long timeoutNanos) throws InterruptedException {
+    long startNanos = nanoTimer.nanoTime();
+    boolean unlatched;
+    do {
+      stopper.checkCancelInProgress(null);
+      unlatched = delegate.await(retryIntervalNanos, NANOSECONDS);
+    } while (!unlatched && nanoTimer.nanoTime() - startNanos < timeoutNanos);
+    return unlatched;
+  }
+
+  @FunctionalInterface
+  interface NanoTimer {
+    long nanoTime();
   }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/util/concurrent/StoppableCountDownOrUpLatch.java b/geode-core/src/main/java/org/apache/geode/internal/util/concurrent/StoppableCountDownOrUpLatch.java
deleted file mode 100644
index c860139..0000000
--- a/geode-core/src/main/java/org/apache/geode/internal/util/concurrent/StoppableCountDownOrUpLatch.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.geode.internal.util.concurrent;
-
-import org.apache.geode.CancelCriterion;
-import org.apache.geode.internal.Assert;
-
-/**
- * Extends the CountDownLatch with the ability to also countUp.
- * <p>
- * Based on the original Doug Lea backport implementation of CountDownLatch.
- *
- * @see java.util.concurrent.CountDownLatch
- */
-public class StoppableCountDownOrUpLatch {
-
-  private int count_;
-
-  /**
-   * The cancellation criterion
-   */
-  private CancelCriterion stopper;
-
-  /**
-   * Constructs a <code>CountDownLatch</code> initialized with the given count.
-   *
-   * @param count the number of times {@link #countDown} must be invoked before threads can pass
-   *        through {@link #await()}
-   * @throws IllegalArgumentException if <code>count</count> is negative
-   */
-  public StoppableCountDownOrUpLatch(CancelCriterion stopper, int count) {
-    Assert.assertTrue(stopper != null);
-    if (count < 0)
-      throw new IllegalArgumentException(
-          "count < 0");
-    this.stopper = stopper;
-    this.count_ = count;
-  }
-
-  /**
-   * Causes the current thread to wait until the latch has counted down to zero, unless the thread
-   * is {@linkplain Thread#interrupt interrupted}.
-   *
-   * <p>
-   * If the current count is zero then this method returns immediately.
-   *
-   * <p>
-   * If the current count is greater than zero then the current thread becomes disabled for thread
-   * scheduling purposes and lies dormant until one of two things happen:
-   * <ul>
-   * <li>The count reaches zero due to invocations of the {@link #countDown} method; or
-   * <li>Some other thread {@linkplain Thread#interrupt interrupts} the current thread.
-   * </ul>
-   *
-   * <p>
-   * If the current thread:
-   * <ul>
-   * <li>has its interrupted status set on entry to this method; or
-   * <li>is {@linkplain Thread#interrupt interrupted} while waiting,
-   * </ul>
-   * then {@link InterruptedException} is thrown and the current thread's interrupted status is
-   * cleared.
-   *
-   * @throws InterruptedException if the current thread is interrupted while waiting
-   */
-  public void await() throws InterruptedException {
-    if (Thread.interrupted())
-      throw new InterruptedException();
-    // Modified to use inner primitive repeatedly, checking
-    // for cancellation
-    for (;;) {
-      stopper.checkCancelInProgress(null);
-      if (await(StoppableCountDownLatch.RETRY_TIME))
-        break;
-    }
-  }
-
-  private static final long NANOS_PER_MS = 1000000;
-
-  /**
-   * Causes the current thread to wait until the latch has counted down to zero, unless the thread
-   * is {@linkplain Thread#interrupt interrupted}, or the specified waiting time elapses.
-   *
-   * <p>
-   * If the current count is zero then this method returns immediately with the value
-   * <code>true</code>.
-   *
-   * <p>
-   * If the current count is greater than zero then the current thread becomes disabled for thread
-   * scheduling purposes and lies dormant until one of three things happen:
-   * <ul>
-   * <li>The count reaches zero due to invocations of the {@link #countDown} method; or
-   * <li>Some other thread {@linkplain Thread#interrupt interrupts} the current thread; or
-   * <li>The specified waiting time elapses.
-   * </ul>
-   *
-   * <p>
-   * If the count reaches zero then the method returns with the value <code>true</code>.
-   *
-   * <p>
-   * If the current thread:
-   * <ul>
-   * <li>has its interrupted status set on entry to this method; or
-   * <li>is {@linkplain Thread#interrupt interrupted} while waiting,
-   * </ul>
-   * then {@link InterruptedException} is thrown and the current thread's interrupted status is
-   * cleared.
-   *
-   * <p>
-   * If the specified waiting time elapses then the value <code>false</code> is returned. If the
-   * time is less than or equal to zero, the method will not wait at all.
-   *
-   * @param msTimeout the maximum time to wait in milliseconds
-   * @return <code>true</code> if the count reached zero and <code>false</code> if the waiting time
-   *         elapsed before the count reached zero
-   * @throws InterruptedException if the current thread is interrupted while waiting
-   */
-  public boolean await(long msTimeout) throws InterruptedException {
-    if (Thread.interrupted())
-      throw new InterruptedException();
-    long nanos = msTimeout * NANOS_PER_MS; // millis to nanos
-    synchronized (this) {
-      if (count_ <= 0)
-        return true;
-      else if (nanos <= 0)
-        return false;
-      else {
-        long deadline = System.nanoTime() + nanos;
-        for (;;) {
-          stopper.checkCancelInProgress(null);
-          wait(nanos / NANOS_PER_MS, (int) (nanos % NANOS_PER_MS));
-          if (count_ <= 0)
-            return true;
-          else {
-            nanos = deadline - System.nanoTime();
-            if (nanos <= 0)
-              return false;
-          }
-        }
-      }
-    }
-  }
-
-  /**
-   * Decrements the count of the latch, releasing all waiting threads if the count reaches zero.
-   *
-   * <p>
-   * If the current count is greater than zero then it is decremented. If the new count is zero then
-   * all waiting threads are re-enabled for thread scheduling purposes.
-   *
-   * <p>
-   * If the current count equals zero then nothing happens.
-   */
-  public synchronized void countDown() {
-    if (count_ == 0)
-      return;
-    if (--count_ == 0)
-      notifyAll();
-  }
-
-  /**
-   * Returns the current count.
-   *
-   * <p>
-   * This method is typically used for debugging and testing purposes.
-   *
-   * @return the current count
-   */
-  public long getCount() {
-    return count_;
-  }
-
-  /**
-   * Returns a string identifying this latch, as well as its state. The state, in brackets, includes
-   * the String <code>"Count ="</code> followed by the current count.
-   *
-   * @return a string identifying this latch, as well as its state
-   */
-  @Override
-  public String toString() {
-    return super.toString() + "[Count = " + getCount() + "]";
-  }
-
-  /**
-   * [GemStone addition]
-   */
-  public synchronized void countUp() {
-    this.count_++;
-  }
-
-  /**
-   * [GemStone addition]
-   */
-  public synchronized long getCountSync() {
-    return getCount();
-  }
-}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/util/concurrent/StoppableCountDownLatchTest.java b/geode-core/src/test/java/org/apache/geode/internal/util/concurrent/StoppableCountDownLatchTest.java
new file mode 100644
index 0000000..9419e11
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/util/concurrent/StoppableCountDownLatchTest.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.util.concurrent;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static org.apache.geode.internal.util.concurrent.StoppableCountDownLatch.RETRY_TIME_MILLIS_DEFAULT;
+import static org.apache.geode.internal.util.concurrent.StoppableCountDownLatch.RETRY_TIME_MILLIS_PROPERTY;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.getTimeout;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.catchThrowable;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.contrib.java.lang.system.RestoreSystemProperties;
+import org.junit.rules.ErrorCollector;
+
+import org.apache.geode.CancelCriterion;
+import org.apache.geode.CancelException;
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
+
+public class StoppableCountDownLatchTest {
+
+  private static final long TIMEOUT_MILLIS = getTimeout().getValueInMS();
+
+  private CancelCriterion stopper;
+
+  @Rule
+  public ErrorCollector errorCollector = new ErrorCollector();
+
+  @Rule
+  public ExecutorServiceRule executorServiceRule = new ExecutorServiceRule();
+
+  @Rule
+  public RestoreSystemProperties restoreSystemProperties = new RestoreSystemProperties();
+
+  @Before
+  public void setUp() {
+    stopper = mock(CancelCriterion.class);
+  }
+
+  @Test
+  public void defaultRetryIntervalNanosIsTwoSeconds() {
+    long twoSeconds = 2;
+    StoppableCountDownLatch latch = new StoppableCountDownLatch(stopper, 1);
+
+    assertThat(NANOSECONDS.toSeconds(latch.retryIntervalNanos()))
+        .isEqualTo(MILLISECONDS.toSeconds(RETRY_TIME_MILLIS_DEFAULT))
+        .isEqualTo(twoSeconds);
+  }
+
+  @Test
+  public void defaultRetryIntervalNanosIsOverriddenBySystemProperty() {
+    long theRetryTimeMillis = 42;
+    System.setProperty(RETRY_TIME_MILLIS_PROPERTY, String.valueOf(theRetryTimeMillis));
+
+    StoppableCountDownLatch latch = new StoppableCountDownLatch(stopper, 1);
+
+    assertThat(NANOSECONDS.toMillis(latch.retryIntervalNanos())).isEqualTo(theRetryTimeMillis);
+  }
+
+  @Test
+  public void awaitReturnsAfterCountDown() {
+    StoppableCountDownLatch latch =
+        new StoppableCountDownLatch(stopper, 1, MILLISECONDS.toNanos(2), System::nanoTime);
+
+    Future<Void> latchFuture = executorServiceRule.submit(() -> latch.await());
+
+    latch.countDown();
+
+    await().untilAsserted(() -> assertThat(latchFuture.isDone()).isTrue());
+  }
+
+  @Test
+  public void awaitIsInterruptible() {
+    int theCount = 1;
+    StoppableCountDownLatch latch =
+        new StoppableCountDownLatch(stopper, theCount, MILLISECONDS.toNanos(2), System::nanoTime);
+    AtomicReference<Thread> theThread = new AtomicReference<>();
+
+    Future<Void> latchFuture = executorServiceRule.submit(() -> {
+      theThread.set(Thread.currentThread());
+      Throwable thrown = catchThrowable(() -> latch.await());
+      errorCollector
+          .checkSucceeds(() -> assertThat(thrown).isInstanceOf(InterruptedException.class));
+    });
+
+    await().until(() -> theThread.get() != null);
+
+    theThread.get().interrupt();
+
+    await().untilAsserted(() -> assertThat(latchFuture.isDone()).isTrue());
+    assertThat(latch.getCount()).isEqualTo(theCount);
+  }
+
+  @Test
+  public void awaitIsCancelable() {
+    int theCount = 1;
+    StoppableCountDownLatch latch =
+        new StoppableCountDownLatch(stopper, theCount, MILLISECONDS.toNanos(2), System::nanoTime);
+    AtomicReference<Thread> theThread = new AtomicReference<>();
+    String cancelMessage = "cancel";
+
+    doNothing()
+        .doThrow(new CancelException(cancelMessage) {})
+        .when(stopper).checkCancelInProgress(any());
+
+    Future<Void> latchFuture = executorServiceRule.submit(() -> {
+      theThread.set(Thread.currentThread());
+      Throwable thrown = catchThrowable(() -> latch.await());
+      errorCollector.checkSucceeds(
+          () -> assertThat(thrown).isInstanceOf(CancelException.class).hasMessage(cancelMessage));
+    });
+
+    await().until(() -> theThread.get() != null);
+
+    await().untilAsserted(() -> assertThat(latchFuture.isDone()).isTrue());
+    assertThat(latch.getCount()).isEqualTo(theCount);
+  }
+
+  @Test
+  public void awaitWithTimeoutAndTimeUnitReturnsTrueAfterCountDown() throws Exception {
+    StoppableCountDownLatch latch =
+        new StoppableCountDownLatch(stopper, 1, MILLISECONDS.toNanos(2), System::nanoTime);
+
+    Future<Boolean> latchFuture =
+        executorServiceRule.submit(() -> latch.await(TIMEOUT_MILLIS, MILLISECONDS));
+
+    latch.countDown();
+
+    assertThat(latchFuture.get(TIMEOUT_MILLIS, MILLISECONDS)).isTrue();
+  }
+
+  @Test
+  public void awaitWithTimeoutAndTimeUnitReturnsFalseAfterTimeout() throws Exception {
+    StoppableCountDownLatch latch =
+        new StoppableCountDownLatch(stopper, 1, MILLISECONDS.toNanos(2), System::nanoTime);
+    long theTimeoutMillis = 2;
+    long startNanos = System.nanoTime();
+
+    Future<Boolean> latchFuture =
+        executorServiceRule.submit(() -> latch.await(theTimeoutMillis, MILLISECONDS));
+
+    assertThat(latchFuture.get(TIMEOUT_MILLIS, MILLISECONDS)).isFalse();
+    assertThat(System.nanoTime() - startNanos).isGreaterThanOrEqualTo(theTimeoutMillis);
+  }
+
+  @Test
+  public void awaitWithTimeoutAndTimeUnitIsInterruptible() {
+    int theCount = 1;
+    StoppableCountDownLatch latch =
+        new StoppableCountDownLatch(stopper, theCount, MILLISECONDS.toNanos(2), System::nanoTime);
+    AtomicReference<Thread> theThread = new AtomicReference<>();
+
+    Future<Void> latchFuture = executorServiceRule.submit(() -> {
+      theThread.set(Thread.currentThread());
+      Throwable thrown = catchThrowable(() -> latch.await(TIMEOUT_MILLIS, MILLISECONDS));
+      errorCollector
+          .checkSucceeds(() -> assertThat(thrown).isInstanceOf(InterruptedException.class));
+    });
+
+    await().until(() -> theThread.get() != null);
+
+    theThread.get().interrupt();
+
+    await().untilAsserted(() -> assertThat(latchFuture.isDone()).isTrue());
+    assertThat(latch.getCount()).isEqualTo(theCount);
+  }
+
+  @Test
+  public void awaitWithTimeoutAndTimeUnitIsCancelableAtBeginning() {
+    int theCount = 1;
+    StoppableCountDownLatch latch =
+        new StoppableCountDownLatch(stopper, theCount, MILLISECONDS.toNanos(2), System::nanoTime);
+    AtomicReference<Thread> theThread = new AtomicReference<>();
+    String cancelMessage = "cancel";
+
+    doThrow(new CancelException(cancelMessage) {}).when(stopper).checkCancelInProgress(any());
+
+    Future<Void> latchFuture = executorServiceRule.submit(() -> {
+      theThread.set(Thread.currentThread());
+      Throwable thrown = catchThrowable(() -> latch.await(TIMEOUT_MILLIS, MILLISECONDS));
+      errorCollector.checkSucceeds(
+          () -> assertThat(thrown).isInstanceOf(CancelException.class).hasMessage(cancelMessage));
+    });
+
+    await().until(() -> theThread.get() != null);
+
+    await().untilAsserted(() -> assertThat(latchFuture.isDone()).isTrue());
+    assertThat(latch.getCount()).isEqualTo(theCount);
+  }
+
+  @Test
+  public void awaitWithTimeoutMillisReturnsTrueAfterCountDown() throws Exception {
+    StoppableCountDownLatch latch =
+        new StoppableCountDownLatch(stopper, 1, MILLISECONDS.toNanos(2), System::nanoTime);
+
+    Future<Boolean> latchFuture = executorServiceRule.submit(() -> latch.await(TIMEOUT_MILLIS));
+
+    latch.countDown();
+
+    assertThat(latchFuture.get(TIMEOUT_MILLIS, MILLISECONDS)).isTrue();
+  }
+
+  @Test
+  public void awaitWithTimeoutMillisReturnsFalseAfterTimeout() throws Exception {
+    StoppableCountDownLatch latch =
+        new StoppableCountDownLatch(stopper, 1, MILLISECONDS.toNanos(2), System::nanoTime);
+    long theTimeoutMillis = 2;
+    long startNanos = System.nanoTime();
+
+    Future<Boolean> latchFuture = executorServiceRule.submit(() -> latch.await(theTimeoutMillis));
+
+    assertThat(latchFuture.get(TIMEOUT_MILLIS, MILLISECONDS)).isFalse();
+    assertThat(System.nanoTime() - startNanos).isGreaterThanOrEqualTo(theTimeoutMillis);
+  }
+
+  @Test
+  public void awaitWithTimeoutMillisIsInterruptible() {
+    int theCount = 1;
+    StoppableCountDownLatch latch =
+        new StoppableCountDownLatch(stopper, theCount, MILLISECONDS.toNanos(2), System::nanoTime);
+    AtomicReference<Thread> theThread = new AtomicReference<>();
+
+    Future<Void> latchFuture = executorServiceRule.submit(() -> {
+      theThread.set(Thread.currentThread());
+      Throwable thrown = catchThrowable(() -> latch.await(getTimeout().getValueInMS()));
+      errorCollector
+          .checkSucceeds(() -> assertThat(thrown).isInstanceOf(InterruptedException.class));
+    });
+
+    await().until(() -> theThread.get() != null);
+
+    theThread.get().interrupt();
+
+    await().untilAsserted(() -> assertThat(latchFuture.isDone()).isTrue());
+    assertThat(latch.getCount()).isEqualTo(theCount);
+  }
+
+  @Test
+  public void awaitWithTimeoutMillisIsCancelableAtBeginning() {
+    int theCount = 1;
+    StoppableCountDownLatch latch =
+        new StoppableCountDownLatch(stopper, theCount, MILLISECONDS.toNanos(2), System::nanoTime);
+    AtomicReference<Thread> theThread = new AtomicReference<>();
+    String cancelMessage = "cancel";
+
+    doThrow(new CancelException(cancelMessage) {}).when(stopper).checkCancelInProgress(any());
+
+    Future<Void> latchFuture = executorServiceRule.submit(() -> {
+      theThread.set(Thread.currentThread());
+      Throwable thrown = catchThrowable(() -> latch.await(TIMEOUT_MILLIS));
+      errorCollector.checkSucceeds(
+          () -> assertThat(thrown).isInstanceOf(CancelException.class).hasMessage(cancelMessage));
+    });
+
+    await().until(() -> theThread.get() != null);
+
+    await().untilAsserted(() -> assertThat(latchFuture.isDone()).isTrue());
+    assertThat(latch.getCount()).isEqualTo(theCount);
+  }
+}
diff --git a/geode-junit/src/main/java/org/apache/geode/test/junit/rules/ExecutorServiceRule.java b/geode-junit/src/main/java/org/apache/geode/test/junit/rules/ExecutorServiceRule.java
index ebce40f..bed8243 100644
--- a/geode-junit/src/main/java/org/apache/geode/test/junit/rules/ExecutorServiceRule.java
+++ b/geode-junit/src/main/java/org/apache/geode/test/junit/rules/ExecutorServiceRule.java
@@ -25,6 +25,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
@@ -46,7 +47,7 @@ import org.apache.geode.test.junit.rules.serializable.SerializableExternalResour
  *
  * {@literal @}Test
  * public void doTest() throws Exception {
- *   Future<Void> result = executorServiceRule.runAsync(() -> {
+ *   Future&lt;Void&gt; result = executorServiceRule.runAsync(() -> {
  *     try {
  *       hangLatch.await();
  *     } catch (InterruptedException e) {
@@ -171,8 +172,11 @@ public class ExecutorServiceRule extends SerializableExternalResource {
    * @throws RejectedExecutionException if this task cannot be accepted for execution
    * @throws NullPointerException if command is null
    */
-  public void execute(Runnable command) {
-    executor.execute(command);
+  public void execute(ThrowingRunnable command) {
+    executor.submit((Callable<Void>) () -> {
+      command.run();
+      return null;
+    });
   }
 
   /**
@@ -205,8 +209,13 @@ public class ExecutorServiceRule extends SerializableExternalResource {
    * @throws RejectedExecutionException if the task cannot be scheduled for execution
    * @throws NullPointerException if the task is null
    */
-  public <T> Future<T> submit(Runnable task, T result) {
-    return executor.submit(task, result);
+  public <T> Future<T> submit(ThrowingRunnable task, T result) {
+    FutureTask<T> futureTask = new FutureTask<>(() -> {
+      task.run();
+      return result;
+    });
+    executor.submit(futureTask);
+    return futureTask;
   }
 
   /**
@@ -218,8 +227,13 @@ public class ExecutorServiceRule extends SerializableExternalResource {
    * @throws RejectedExecutionException if the task cannot be scheduled for execution
    * @throws NullPointerException if the task is null
    */
-  public Future<?> submit(Runnable task) {
-    return executor.submit(task);
+  public Future<Void> submit(ThrowingRunnable task) {
+    FutureTask<Void> futureTask = new FutureTask<>(() -> {
+      task.run();
+      return null;
+    });
+    executor.submit(futureTask);
+    return futureTask;
   }
 
   /**
@@ -294,6 +308,22 @@ public class ExecutorServiceRule extends SerializableExternalResource {
   }
 
   /**
+   * This interface replaces {@link Runnable} in cases when execution of {@link #run()} method may
+   * throw exception.
+   *
+   * <p>
+   * Useful for capturing lambdas that throw exceptions.
+   */
+  @FunctionalInterface
+  public interface ThrowingRunnable {
+    /**
+     * @throws Exception The exception that may be thrown
+     * @see Runnable#run()
+     */
+    void run() throws Exception;
+  }
+
+  /**
    * Modified version of {@code java.util.concurrent.Executors$DefaultThreadFactory} that uses
    * a {@code Set<WeakReference<Thread>>} to track the {@code Thread}s in the factory's
    * {@code ThreadGroup} excluding subgroups.