You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2019/04/09 16:34:38 UTC
[geode] branch develop updated: GEODE-6603: Create
StoppableCountDownLatch unit tests (#3413)
This is an automated email from the ASF dual-hosted git repository.
klund pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new f7ae070 GEODE-6603: Create StoppableCountDownLatch unit tests (#3413)
f7ae070 is described below
commit f7ae0704415e9fcc26b2d24d4dd1e0dcb5f8d06d
Author: Kirk Lund <kl...@apache.org>
AuthorDate: Tue Apr 9 09:34:27 2019 -0700
GEODE-6603: Create StoppableCountDownLatch unit tests (#3413)
* Create StoppableCountDownLatchTest and fixup any issues uncovered by
testing.
* Remove unused class StoppableCountDownOrUpLatch.
* Improve ExecutorServiceRule for StoppableCountDownLatchTest.
---
.../cache/BucketCreationCrashRegressionTest.java | 4 +-
.../util/concurrent/StoppableCountDownLatch.java | 100 +++++---
.../concurrent/StoppableCountDownOrUpLatch.java | 210 ---------------
.../concurrent/StoppableCountDownLatchTest.java | 283 +++++++++++++++++++++
.../test/junit/rules/ExecutorServiceRule.java | 44 +++-
5 files changed, 387 insertions(+), 254 deletions(-)
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/BucketCreationCrashRegressionTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/BucketCreationCrashRegressionTest.java
index 9f3316a..3c97f97 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/BucketCreationCrashRegressionTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/BucketCreationCrashRegressionTest.java
@@ -18,6 +18,7 @@ import static org.apache.geode.distributed.ConfigurationProperties.ENABLE_CLUSTE
import static org.apache.geode.distributed.ConfigurationProperties.ENABLE_NETWORK_PARTITION_DETECTION;
import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
import static org.apache.geode.distributed.ConfigurationProperties.USE_CLUSTER_CONFIGURATION;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.apache.geode.test.dunit.DistributedTestUtils.crashDistributedSystem;
import static org.apache.geode.test.dunit.IgnoredException.addIgnoredException;
import static org.apache.geode.test.dunit.Invoke.invokeInEveryVM;
@@ -172,7 +173,8 @@ public class BucketCreationCrashRegressionTest implements Serializable {
.isInstanceOf(RMIException.class)
.hasCauseInstanceOf(DistributedSystemDisconnectedException.class);
- assertThat(server2.invoke(() -> getBucketList())).containsExactly(3);
+ await()
+ .untilAsserted(() -> assertThat(server2.invoke(() -> getBucketList())).containsExactly(3));
// This shouldn't hang, because the bucket creation should finish.
server2.invoke(() -> putData(3, 4, "a"));
diff --git a/geode-core/src/main/java/org/apache/geode/internal/util/concurrent/StoppableCountDownLatch.java b/geode-core/src/main/java/org/apache/geode/internal/util/concurrent/StoppableCountDownLatch.java
index 6336404..6c6576a 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/util/concurrent/StoppableCountDownLatch.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/util/concurrent/StoppableCountDownLatch.java
@@ -14,11 +14,14 @@
*/
package org.apache.geode.internal.util.concurrent;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static org.apache.geode.distributed.internal.DistributionConfig.GEMFIRE_PREFIX;
+
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.geode.CancelCriterion;
-import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.internal.Assert;
/**
@@ -26,75 +29,100 @@ import org.apache.geode.internal.Assert;
*/
public class StoppableCountDownLatch {
- /**
- * This is how often waiters will wake up to check for cancellation
- */
- static final long RETRY_TIME = Long
- .getLong(DistributionConfig.GEMFIRE_PREFIX + "stoppable-retry-interval", 2000).longValue();
+ static final String RETRY_TIME_MILLIS_PROPERTY = GEMFIRE_PREFIX + "stoppable-retry-interval";
+ static final long RETRY_TIME_MILLIS_DEFAULT = 2000;
+ private final CountDownLatch delegate;
- /**
- * The underlying latch
- */
- private final CountDownLatch latch;
+ private final CancelCriterion stopper;
/**
- * The cancellation criterion
+ * This is how often waiters will wake up to check for cancellation
*/
- private final CancelCriterion stopper;
+ private final long retryIntervalNanos;
+
+ private final NanoTimer nanoTimer;
/**
+ * @param stopper the CancelCriterion to check before awaiting
* @param count the number of times {@link #countDown} must be invoked before threads can pass
* through {@link #await()}
*
* @throws IllegalArgumentException if {@code count} is negative
*/
- public StoppableCountDownLatch(CancelCriterion stopper, int count) {
+ public StoppableCountDownLatch(final CancelCriterion stopper, final int count) {
+ this(stopper, count,
+ MILLISECONDS.toNanos(Long.getLong(RETRY_TIME_MILLIS_PROPERTY, RETRY_TIME_MILLIS_DEFAULT)),
+ System::nanoTime);
+ }
+
+ StoppableCountDownLatch(final CancelCriterion stopper, final int count,
+ final long retryIntervalNanos, final NanoTimer nanoTimer) {
Assert.assertTrue(stopper != null);
- this.latch = new CountDownLatch(count);
+ delegate = new CountDownLatch(count);
this.stopper = stopper;
+ this.retryIntervalNanos = retryIntervalNanos;
+ this.nanoTimer = nanoTimer;
}
public void await() throws InterruptedException {
- for (;;) {
+ do {
stopper.checkCancelInProgress(null);
- if (latch.await(RETRY_TIME, TimeUnit.MILLISECONDS)) {
- break;
- }
+ } while (!delegate.await(retryIntervalNanos, NANOSECONDS));
+ }
+
+ public boolean await(final long timeout, final TimeUnit unit) throws InterruptedException {
+ stopper.checkCancelInProgress(null);
+ long timeoutNanos = unit.toNanos(timeout);
+ if (timeoutNanos > retryIntervalNanos) {
+ return awaitWithCheck(timeoutNanos);
}
+ return delegate.await(timeoutNanos, NANOSECONDS);
}
/**
- * @param msTimeout how long to wait in milliseconds
+ * @param timeoutMillis how long to wait in milliseconds
*
* @return true if it was unlatched
*/
- public boolean await(long msTimeout) throws InterruptedException {
+ public boolean await(final long timeoutMillis) throws InterruptedException {
stopper.checkCancelInProgress(null);
- return latch.await(msTimeout, TimeUnit.MILLISECONDS);
- }
-
- public boolean await(final long timeout, final TimeUnit unit) throws InterruptedException {
- stopper.checkCancelInProgress(null);
- return latch.await(timeout, unit);
+ long timeoutNanos = MILLISECONDS.toNanos(timeoutMillis);
+ if (timeoutNanos > retryIntervalNanos) {
+ return awaitWithCheck(timeoutNanos);
+ }
+ return delegate.await(timeoutNanos, NANOSECONDS);
}
- public synchronized void countDown() {
- latch.countDown();
+ public void countDown() {
+ delegate.countDown();
}
- /**
- * @return the current count
- */
public long getCount() {
- return latch.getCount();
+ return delegate.getCount();
}
- /**
- * @return a string identifying this latch, as well as its state
- */
@Override
public String toString() {
- return "(Stoppable) " + latch.toString();
+ return "(Stoppable) " + delegate;
+ }
+
+ long retryIntervalNanos() {
+ return retryIntervalNanos;
+ }
+
+ private boolean awaitWithCheck(final long timeoutNanos) throws InterruptedException {
+ long startNanos = nanoTimer.nanoTime();
+ boolean unlatched;
+ do {
+ stopper.checkCancelInProgress(null);
+ unlatched = delegate.await(retryIntervalNanos, NANOSECONDS);
+ } while (!unlatched && nanoTimer.nanoTime() - startNanos < timeoutNanos);
+ return unlatched;
+ }
+
+ @FunctionalInterface
+ interface NanoTimer {
+ long nanoTime();
}
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/util/concurrent/StoppableCountDownOrUpLatch.java b/geode-core/src/main/java/org/apache/geode/internal/util/concurrent/StoppableCountDownOrUpLatch.java
deleted file mode 100644
index c860139..0000000
--- a/geode-core/src/main/java/org/apache/geode/internal/util/concurrent/StoppableCountDownOrUpLatch.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.geode.internal.util.concurrent;
-
-import org.apache.geode.CancelCriterion;
-import org.apache.geode.internal.Assert;
-
-/**
- * Extends the CountDownLatch with the ability to also countUp.
- * <p>
- * Based on the original Doug Lea backport implementation of CountDownLatch.
- *
- * @see java.util.concurrent.CountDownLatch
- */
-public class StoppableCountDownOrUpLatch {
-
- private int count_;
-
- /**
- * The cancellation criterion
- */
- private CancelCriterion stopper;
-
- /**
- * Constructs a <code>CountDownLatch</code> initialized with the given count.
- *
- * @param count the number of times {@link #countDown} must be invoked before threads can pass
- * through {@link #await()}
- * @throws IllegalArgumentException if <code>count</count> is negative
- */
- public StoppableCountDownOrUpLatch(CancelCriterion stopper, int count) {
- Assert.assertTrue(stopper != null);
- if (count < 0)
- throw new IllegalArgumentException(
- "count < 0");
- this.stopper = stopper;
- this.count_ = count;
- }
-
- /**
- * Causes the current thread to wait until the latch has counted down to zero, unless the thread
- * is {@linkplain Thread#interrupt interrupted}.
- *
- * <p>
- * If the current count is zero then this method returns immediately.
- *
- * <p>
- * If the current count is greater than zero then the current thread becomes disabled for thread
- * scheduling purposes and lies dormant until one of two things happen:
- * <ul>
- * <li>The count reaches zero due to invocations of the {@link #countDown} method; or
- * <li>Some other thread {@linkplain Thread#interrupt interrupts} the current thread.
- * </ul>
- *
- * <p>
- * If the current thread:
- * <ul>
- * <li>has its interrupted status set on entry to this method; or
- * <li>is {@linkplain Thread#interrupt interrupted} while waiting,
- * </ul>
- * then {@link InterruptedException} is thrown and the current thread's interrupted status is
- * cleared.
- *
- * @throws InterruptedException if the current thread is interrupted while waiting
- */
- public void await() throws InterruptedException {
- if (Thread.interrupted())
- throw new InterruptedException();
- // Modified to use inner primitive repeatedly, checking
- // for cancellation
- for (;;) {
- stopper.checkCancelInProgress(null);
- if (await(StoppableCountDownLatch.RETRY_TIME))
- break;
- }
- }
-
- private static final long NANOS_PER_MS = 1000000;
-
- /**
- * Causes the current thread to wait until the latch has counted down to zero, unless the thread
- * is {@linkplain Thread#interrupt interrupted}, or the specified waiting time elapses.
- *
- * <p>
- * If the current count is zero then this method returns immediately with the value
- * <code>true</code>.
- *
- * <p>
- * If the current count is greater than zero then the current thread becomes disabled for thread
- * scheduling purposes and lies dormant until one of three things happen:
- * <ul>
- * <li>The count reaches zero due to invocations of the {@link #countDown} method; or
- * <li>Some other thread {@linkplain Thread#interrupt interrupts} the current thread; or
- * <li>The specified waiting time elapses.
- * </ul>
- *
- * <p>
- * If the count reaches zero then the method returns with the value <code>true</code>.
- *
- * <p>
- * If the current thread:
- * <ul>
- * <li>has its interrupted status set on entry to this method; or
- * <li>is {@linkplain Thread#interrupt interrupted} while waiting,
- * </ul>
- * then {@link InterruptedException} is thrown and the current thread's interrupted status is
- * cleared.
- *
- * <p>
- * If the specified waiting time elapses then the value <code>false</code> is returned. If the
- * time is less than or equal to zero, the method will not wait at all.
- *
- * @param msTimeout the maximum time to wait in milliseconds
- * @return <code>true</code> if the count reached zero and <code>false</code> if the waiting time
- * elapsed before the count reached zero
- * @throws InterruptedException if the current thread is interrupted while waiting
- */
- public boolean await(long msTimeout) throws InterruptedException {
- if (Thread.interrupted())
- throw new InterruptedException();
- long nanos = msTimeout * NANOS_PER_MS; // millis to nanos
- synchronized (this) {
- if (count_ <= 0)
- return true;
- else if (nanos <= 0)
- return false;
- else {
- long deadline = System.nanoTime() + nanos;
- for (;;) {
- stopper.checkCancelInProgress(null);
- wait(nanos / NANOS_PER_MS, (int) (nanos % NANOS_PER_MS));
- if (count_ <= 0)
- return true;
- else {
- nanos = deadline - System.nanoTime();
- if (nanos <= 0)
- return false;
- }
- }
- }
- }
- }
-
- /**
- * Decrements the count of the latch, releasing all waiting threads if the count reaches zero.
- *
- * <p>
- * If the current count is greater than zero then it is decremented. If the new count is zero then
- * all waiting threads are re-enabled for thread scheduling purposes.
- *
- * <p>
- * If the current count equals zero then nothing happens.
- */
- public synchronized void countDown() {
- if (count_ == 0)
- return;
- if (--count_ == 0)
- notifyAll();
- }
-
- /**
- * Returns the current count.
- *
- * <p>
- * This method is typically used for debugging and testing purposes.
- *
- * @return the current count
- */
- public long getCount() {
- return count_;
- }
-
- /**
- * Returns a string identifying this latch, as well as its state. The state, in brackets, includes
- * the String <code>"Count ="</code> followed by the current count.
- *
- * @return a string identifying this latch, as well as its state
- */
- @Override
- public String toString() {
- return super.toString() + "[Count = " + getCount() + "]";
- }
-
- /**
- * [GemStone addition]
- */
- public synchronized void countUp() {
- this.count_++;
- }
-
- /**
- * [GemStone addition]
- */
- public synchronized long getCountSync() {
- return getCount();
- }
-}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/util/concurrent/StoppableCountDownLatchTest.java b/geode-core/src/test/java/org/apache/geode/internal/util/concurrent/StoppableCountDownLatchTest.java
new file mode 100644
index 0000000..9419e11
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/util/concurrent/StoppableCountDownLatchTest.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.util.concurrent;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static org.apache.geode.internal.util.concurrent.StoppableCountDownLatch.RETRY_TIME_MILLIS_DEFAULT;
+import static org.apache.geode.internal.util.concurrent.StoppableCountDownLatch.RETRY_TIME_MILLIS_PROPERTY;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.getTimeout;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.catchThrowable;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.contrib.java.lang.system.RestoreSystemProperties;
+import org.junit.rules.ErrorCollector;
+
+import org.apache.geode.CancelCriterion;
+import org.apache.geode.CancelException;
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
+
+public class StoppableCountDownLatchTest {
+
+ private static final long TIMEOUT_MILLIS = getTimeout().getValueInMS();
+
+ private CancelCriterion stopper;
+
+ @Rule
+ public ErrorCollector errorCollector = new ErrorCollector();
+
+ @Rule
+ public ExecutorServiceRule executorServiceRule = new ExecutorServiceRule();
+
+ @Rule
+ public RestoreSystemProperties restoreSystemProperties = new RestoreSystemProperties();
+
+ @Before
+ public void setUp() {
+ stopper = mock(CancelCriterion.class);
+ }
+
+ @Test
+ public void defaultRetryIntervalNanosIsTwoSeconds() {
+ long twoSeconds = 2;
+ StoppableCountDownLatch latch = new StoppableCountDownLatch(stopper, 1);
+
+ assertThat(NANOSECONDS.toSeconds(latch.retryIntervalNanos()))
+ .isEqualTo(MILLISECONDS.toSeconds(RETRY_TIME_MILLIS_DEFAULT))
+ .isEqualTo(twoSeconds);
+ }
+
+ @Test
+ public void defaultRetryIntervalNanosIsOverriddenBySystemProperty() {
+ long theRetryTimeMillis = 42;
+ System.setProperty(RETRY_TIME_MILLIS_PROPERTY, String.valueOf(theRetryTimeMillis));
+
+ StoppableCountDownLatch latch = new StoppableCountDownLatch(stopper, 1);
+
+ assertThat(NANOSECONDS.toMillis(latch.retryIntervalNanos())).isEqualTo(theRetryTimeMillis);
+ }
+
+ @Test
+ public void awaitReturnsAfterCountDown() {
+ StoppableCountDownLatch latch =
+ new StoppableCountDownLatch(stopper, 1, MILLISECONDS.toNanos(2), System::nanoTime);
+
+ Future<Void> latchFuture = executorServiceRule.submit(() -> latch.await());
+
+ latch.countDown();
+
+ await().untilAsserted(() -> assertThat(latchFuture.isDone()).isTrue());
+ }
+
+ @Test
+ public void awaitIsInterruptible() {
+ int theCount = 1;
+ StoppableCountDownLatch latch =
+ new StoppableCountDownLatch(stopper, theCount, MILLISECONDS.toNanos(2), System::nanoTime);
+ AtomicReference<Thread> theThread = new AtomicReference<>();
+
+ Future<Void> latchFuture = executorServiceRule.submit(() -> {
+ theThread.set(Thread.currentThread());
+ Throwable thrown = catchThrowable(() -> latch.await());
+ errorCollector
+ .checkSucceeds(() -> assertThat(thrown).isInstanceOf(InterruptedException.class));
+ });
+
+ await().until(() -> theThread.get() != null);
+
+ theThread.get().interrupt();
+
+ await().untilAsserted(() -> assertThat(latchFuture.isDone()).isTrue());
+ assertThat(latch.getCount()).isEqualTo(theCount);
+ }
+
+ @Test
+ public void awaitIsCancelable() {
+ int theCount = 1;
+ StoppableCountDownLatch latch =
+ new StoppableCountDownLatch(stopper, theCount, MILLISECONDS.toNanos(2), System::nanoTime);
+ AtomicReference<Thread> theThread = new AtomicReference<>();
+ String cancelMessage = "cancel";
+
+ doNothing()
+ .doThrow(new CancelException(cancelMessage) {})
+ .when(stopper).checkCancelInProgress(any());
+
+ Future<Void> latchFuture = executorServiceRule.submit(() -> {
+ theThread.set(Thread.currentThread());
+ Throwable thrown = catchThrowable(() -> latch.await());
+ errorCollector.checkSucceeds(
+ () -> assertThat(thrown).isInstanceOf(CancelException.class).hasMessage(cancelMessage));
+ });
+
+ await().until(() -> theThread.get() != null);
+
+ await().untilAsserted(() -> assertThat(latchFuture.isDone()).isTrue());
+ assertThat(latch.getCount()).isEqualTo(theCount);
+ }
+
+ @Test
+ public void awaitWithTimeoutAndTimeUnitReturnsTrueAfterCountDown() throws Exception {
+ StoppableCountDownLatch latch =
+ new StoppableCountDownLatch(stopper, 1, MILLISECONDS.toNanos(2), System::nanoTime);
+
+ Future<Boolean> latchFuture =
+ executorServiceRule.submit(() -> latch.await(TIMEOUT_MILLIS, MILLISECONDS));
+
+ latch.countDown();
+
+ assertThat(latchFuture.get(TIMEOUT_MILLIS, MILLISECONDS)).isTrue();
+ }
+
+ @Test
+ public void awaitWithTimeoutAndTimeUnitReturnsFalseAfterTimeout() throws Exception {
+ StoppableCountDownLatch latch =
+ new StoppableCountDownLatch(stopper, 1, MILLISECONDS.toNanos(2), System::nanoTime);
+ long theTimeoutMillis = 2;
+ long startNanos = System.nanoTime();
+
+ Future<Boolean> latchFuture =
+ executorServiceRule.submit(() -> latch.await(theTimeoutMillis, MILLISECONDS));
+
+ assertThat(latchFuture.get(TIMEOUT_MILLIS, MILLISECONDS)).isFalse();
+ assertThat(System.nanoTime() - startNanos).isGreaterThanOrEqualTo(theTimeoutMillis);
+ }
+
+ @Test
+ public void awaitWithTimeoutAndTimeUnitIsInterruptible() {
+ int theCount = 1;
+ StoppableCountDownLatch latch =
+ new StoppableCountDownLatch(stopper, theCount, MILLISECONDS.toNanos(2), System::nanoTime);
+ AtomicReference<Thread> theThread = new AtomicReference<>();
+
+ Future<Void> latchFuture = executorServiceRule.submit(() -> {
+ theThread.set(Thread.currentThread());
+ Throwable thrown = catchThrowable(() -> latch.await(TIMEOUT_MILLIS, MILLISECONDS));
+ errorCollector
+ .checkSucceeds(() -> assertThat(thrown).isInstanceOf(InterruptedException.class));
+ });
+
+ await().until(() -> theThread.get() != null);
+
+ theThread.get().interrupt();
+
+ await().untilAsserted(() -> assertThat(latchFuture.isDone()).isTrue());
+ assertThat(latch.getCount()).isEqualTo(theCount);
+ }
+
+ @Test
+ public void awaitWithTimeoutAndTimeUnitIsCancelableAtBeginning() {
+ int theCount = 1;
+ StoppableCountDownLatch latch =
+ new StoppableCountDownLatch(stopper, theCount, MILLISECONDS.toNanos(2), System::nanoTime);
+ AtomicReference<Thread> theThread = new AtomicReference<>();
+ String cancelMessage = "cancel";
+
+ doThrow(new CancelException(cancelMessage) {}).when(stopper).checkCancelInProgress(any());
+
+ Future<Void> latchFuture = executorServiceRule.submit(() -> {
+ theThread.set(Thread.currentThread());
+ Throwable thrown = catchThrowable(() -> latch.await(TIMEOUT_MILLIS, MILLISECONDS));
+ errorCollector.checkSucceeds(
+ () -> assertThat(thrown).isInstanceOf(CancelException.class).hasMessage(cancelMessage));
+ });
+
+ await().until(() -> theThread.get() != null);
+
+ await().untilAsserted(() -> assertThat(latchFuture.isDone()).isTrue());
+ assertThat(latch.getCount()).isEqualTo(theCount);
+ }
+
+ @Test
+ public void awaitWithTimeoutMillisReturnsTrueAfterCountDown() throws Exception {
+ StoppableCountDownLatch latch =
+ new StoppableCountDownLatch(stopper, 1, MILLISECONDS.toNanos(2), System::nanoTime);
+
+ Future<Boolean> latchFuture = executorServiceRule.submit(() -> latch.await(TIMEOUT_MILLIS));
+
+ latch.countDown();
+
+ assertThat(latchFuture.get(TIMEOUT_MILLIS, MILLISECONDS)).isTrue();
+ }
+
+ @Test
+ public void awaitWithTimeoutMillisReturnsFalseAfterTimeout() throws Exception {
+ StoppableCountDownLatch latch =
+ new StoppableCountDownLatch(stopper, 1, MILLISECONDS.toNanos(2), System::nanoTime);
+ long theTimeoutMillis = 2;
+ long startNanos = System.nanoTime();
+
+ Future<Boolean> latchFuture = executorServiceRule.submit(() -> latch.await(theTimeoutMillis));
+
+ assertThat(latchFuture.get(TIMEOUT_MILLIS, MILLISECONDS)).isFalse();
+ assertThat(System.nanoTime() - startNanos).isGreaterThanOrEqualTo(theTimeoutMillis);
+ }
+
+ @Test
+ public void awaitWithTimeoutMillisIsInterruptible() {
+ int theCount = 1;
+ StoppableCountDownLatch latch =
+ new StoppableCountDownLatch(stopper, theCount, MILLISECONDS.toNanos(2), System::nanoTime);
+ AtomicReference<Thread> theThread = new AtomicReference<>();
+
+ Future<Void> latchFuture = executorServiceRule.submit(() -> {
+ theThread.set(Thread.currentThread());
+ Throwable thrown = catchThrowable(() -> latch.await(getTimeout().getValueInMS()));
+ errorCollector
+ .checkSucceeds(() -> assertThat(thrown).isInstanceOf(InterruptedException.class));
+ });
+
+ await().until(() -> theThread.get() != null);
+
+ theThread.get().interrupt();
+
+ await().untilAsserted(() -> assertThat(latchFuture.isDone()).isTrue());
+ assertThat(latch.getCount()).isEqualTo(theCount);
+ }
+
+ @Test
+ public void awaitWithTimeoutMillisIsCancelableAtBeginning() {
+ int theCount = 1;
+ StoppableCountDownLatch latch =
+ new StoppableCountDownLatch(stopper, theCount, MILLISECONDS.toNanos(2), System::nanoTime);
+ AtomicReference<Thread> theThread = new AtomicReference<>();
+ String cancelMessage = "cancel";
+
+ doThrow(new CancelException(cancelMessage) {}).when(stopper).checkCancelInProgress(any());
+
+ Future<Void> latchFuture = executorServiceRule.submit(() -> {
+ theThread.set(Thread.currentThread());
+ Throwable thrown = catchThrowable(() -> latch.await(TIMEOUT_MILLIS));
+ errorCollector.checkSucceeds(
+ () -> assertThat(thrown).isInstanceOf(CancelException.class).hasMessage(cancelMessage));
+ });
+
+ await().until(() -> theThread.get() != null);
+
+ await().untilAsserted(() -> assertThat(latchFuture.isDone()).isTrue());
+ assertThat(latch.getCount()).isEqualTo(theCount);
+ }
+}
diff --git a/geode-junit/src/main/java/org/apache/geode/test/junit/rules/ExecutorServiceRule.java b/geode-junit/src/main/java/org/apache/geode/test/junit/rules/ExecutorServiceRule.java
index ebce40f..bed8243 100644
--- a/geode-junit/src/main/java/org/apache/geode/test/junit/rules/ExecutorServiceRule.java
+++ b/geode-junit/src/main/java/org/apache/geode/test/junit/rules/ExecutorServiceRule.java
@@ -25,6 +25,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
@@ -46,7 +47,7 @@ import org.apache.geode.test.junit.rules.serializable.SerializableExternalResour
*
* {@literal @}Test
* public void doTest() throws Exception {
- * Future<Void> result = executorServiceRule.runAsync(() -> {
+ * Future<Void> result = executorServiceRule.runAsync(() -> {
* try {
* hangLatch.await();
* } catch (InterruptedException e) {
@@ -171,8 +172,11 @@ public class ExecutorServiceRule extends SerializableExternalResource {
* @throws RejectedExecutionException if this task cannot be accepted for execution
* @throws NullPointerException if command is null
*/
- public void execute(Runnable command) {
- executor.execute(command);
+ public void execute(ThrowingRunnable command) {
+ executor.submit((Callable<Void>) () -> {
+ command.run();
+ return null;
+ });
}
/**
@@ -205,8 +209,13 @@ public class ExecutorServiceRule extends SerializableExternalResource {
* @throws RejectedExecutionException if the task cannot be scheduled for execution
* @throws NullPointerException if the task is null
*/
- public <T> Future<T> submit(Runnable task, T result) {
- return executor.submit(task, result);
+ public <T> Future<T> submit(ThrowingRunnable task, T result) {
+ FutureTask<T> futureTask = new FutureTask<>(() -> {
+ task.run();
+ return result;
+ });
+ executor.submit(futureTask);
+ return futureTask;
}
/**
@@ -218,8 +227,13 @@ public class ExecutorServiceRule extends SerializableExternalResource {
* @throws RejectedExecutionException if the task cannot be scheduled for execution
* @throws NullPointerException if the task is null
*/
- public Future<?> submit(Runnable task) {
- return executor.submit(task);
+ public Future<Void> submit(ThrowingRunnable task) {
+ FutureTask<Void> futureTask = new FutureTask<>(() -> {
+ task.run();
+ return null;
+ });
+ executor.submit(futureTask);
+ return futureTask;
}
/**
@@ -294,6 +308,22 @@ public class ExecutorServiceRule extends SerializableExternalResource {
}
/**
+ * This interface replaces {@link Runnable} in cases when execution of {@link #run()} method may
+ * throw exception.
+ *
+ * <p>
+ * Useful for capturing lambdas that throw exceptions.
+ */
+ @FunctionalInterface
+ public interface ThrowingRunnable {
+ /**
+ * @throws Exception The exception that may be thrown
+ * @see Runnable#run()
+ */
+ void run() throws Exception;
+ }
+
+ /**
* Modified version of {@code java.util.concurrent.Executors$DefaultThreadFactory} that uses
* a {@code Set<WeakReference<Thread>>} to track the {@code Thread}s in the factory's
* {@code ThreadGroup} excluding subgroups.