You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by up...@apache.org on 2018/08/15 17:09:23 UTC
[geode] branch develop updated: GEODE-5438: Add rule for running
threads in tests
This is an automated email from the ASF dual-hosted git repository.
upthewaterspout pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 5e790c8 GEODE-5438: Add rule for running threads in tests
5e790c8 is described below
commit 5e790c8b99d510525f0954a6d0f3bf10aa9478c1
Author: Helena Bales <hb...@pivotal.io>
AuthorDate: Wed Aug 15 10:09:16 2018 -0700
GEODE-5438: Add rule for running threads in tests
Signed-off-by Brian Rowe<br...@pivotal.io>
---
.../geode/test/junit/rules/ConcurrencyRule.java | 408 +++++++++++++++++++
.../test/junit/rules/ConcurrencyRuleTest.java | 440 +++++++++++++++++++++
2 files changed, 848 insertions(+)
diff --git a/geode-junit/src/main/java/org/apache/geode/test/junit/rules/ConcurrencyRule.java b/geode-junit/src/main/java/org/apache/geode/test/junit/rules/ConcurrencyRule.java
new file mode 100644
index 0000000..948b2d4
--- /dev/null
+++ b/geode-junit/src/main/java/org/apache/geode/test/junit/rules/ConcurrencyRule.java
@@ -0,0 +1,408 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.test.junit.rules;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.catchThrowable;
+
+import java.lang.reflect.Field;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.base.Stopwatch;
+import org.junit.rules.ErrorCollector;
+
+import org.apache.geode.test.junit.rules.serializable.SerializableExternalResource;
+
+/**
+ * A rule for testing using multiple threads. This rule should not be used as a class rule. This
+ * rule accepts threads to be run, runs them in series or parallel, and throws exceptions if any of
+ * the threads threw an unexpected exception or returned an incorrect value.
+ *
+ * Basic Steps for Usage:
+ * 1. Declare the rule as a test rule with the @Rule annotation
+ * 2. Create a Callable, or create a Runnable and use toCallable(runnable) to convert it
+ * 3. Add the Callable to the rule using add(callable)
+ * 4. (Optional) Add expectations for the outcome of running the Callable (values or exceptions),
+ * and/or any repetition of threads (for N iterations or for a duration)
+ * 5. Run all submitted callables in series or in parallel
+ * 6. Before re-executing within the same test, use clear() to empty the list of callables and
+ * errors
+ *
+ * Example Usage:
+ *
+ * @Rule
+ * public ConcurrencyRule concurrencyRule = new ConcurrencyRule(); // step 1
+ *
+ * @Test
+ * public void testName() {
+ * Callable<String> c1 = () -> {
+ * return "some Value";
+ * }; // step 2
+ *
+ * concurrencyRule.add(c1).expectValue("some Value").repeatForIterations(3); // steps 3&4
+ * concurrencyRule.executeInParallel(); // step 5
+ * concurrencyRule.clear(); // step 6
+ * // keep using the rule as above, or ConcurrencyRule.after() will be called for cleanup
+ * }
+ *
+ */
+public class ConcurrencyRule extends SerializableExternalResource {
+
+ private final ExecutorService threadPool = Executors.newCachedThreadPool();
+ private final Collection<ConcurrentOperation> toInvoke;
+ private final Collection<Future<Void>> futures;
+
+ private ProtectedErrorCollector errorCollector;
+ private Duration timeout;
+
+ /**
+ * A default constructor that sets the timeout to a default of 30 seconds
+ */
+ public ConcurrencyRule() {
+ toInvoke = new ArrayList<>();
+ futures = new ArrayList<>();
+ timeout = Duration.ofSeconds(300);
+ errorCollector = new ProtectedErrorCollector();
+ }
+
+ /**
+ * A non-default constructor that sets the timeout to the given duration
+ *
+ * @param timeout the maximum duration that threads should execute, given that the submitted
+ * tasks respond to cancellation signals.
+ */
+ public ConcurrencyRule(Duration timeout) {
+ toInvoke = new ArrayList<>();
+ futures = new ArrayList<>();
+ this.timeout = timeout;
+ errorCollector = new ProtectedErrorCollector();
+ }
+
+ @Override
+ protected void after() {
+ clear();
+ stopThreadPool();
+ }
+
+
+ /**
+ * Adds a Callable to the concurrency rule to be run. Expectations for return values and thrown
+ * exceptions, as well as any repetition of the thread should be added using ConcurrentOperation.
+ *
+ * @param callable, a Callable to be run. If the Callable throws an exception that is not expected
+ * it will be thrown up to the test that the threads are run from.
+ * @return concurrentOperation, the ConcurrentOperation that has been added to the rule
+ */
+ public <T> ConcurrentOperation<T> add(Callable<T> callable) {
+ ConcurrentOperation<T> concurrentOperation = new ConcurrentOperation(callable);
+ toInvoke.add(concurrentOperation);
+
+ return concurrentOperation;
+ }
+
+
+ /**
+ * Runs all callables in the rule in parallel and fails if threads' conditions were not met. Each
+ * thread runs until timeout has been reached. This method will not return until all
+ * threads have completed or been cancelled.
+ *
+ * @throws InterruptedException if interrupted before timeout
+ * @throws RuntimeException with cause of MultipleFailureException with a list of failures
+ * including AssertionErrors for all threads whose expectations were not met (if there are
+ * multiple failures).
+ * @throws AssertionError if a single thread's expectations are not met
+ * @throws Exception if a thread throws an unexpected exception
+ */
+ public void executeInParallel() {
+ for (ConcurrentOperation op : toInvoke) {
+ futures.add(threadPool.submit(op));
+ }
+
+ awaitFutures();
+ errorCollector.verify();
+ }
+
+ /**
+ * Runs all callables in the rule in the order that they were added and fail if threads'
+ * conditions
+ * are not met. Each thread runs until timeout is reached. This method will not return until all
+ * threads have completed or been cancelled.
+ *
+ * @throws RuntimeException with cause of MultipleFailureException with a list of failures
+ * including AssertionErrors for all threads whose expectations were not met (if there are
+ * multiple failures).
+ * @throws AssertionError if a single thread's expectations are not met
+ * @throws Exception if a thread throws an unexpected exception
+ */
+ public void executeInSeries() {
+ for (ConcurrentOperation op : toInvoke) {
+ awaitFuture(threadPool.submit(op));
+ }
+
+ errorCollector.verify();
+ }
+
+ /**
+ * Clears the lists of callables, futures, and errors. Use between calls to execute methods to
+ * avoid rerunning and rethrowing callables from the previous executions.
+ */
+ public void clear() {
+ toInvoke.clear();
+ futures.clear();
+ errorCollector = new ProtectedErrorCollector();
+ }
+
+ /**
+ * Shuts down the thread pool. Does not need to be called if the rule's after is called
+ */
+ public void stopThreadPool() {
+ threadPool.shutdownNow();
+ }
+
+ /**
+ * Set the timeout for the threads. After the timeout is reached, the threads will be interrupted
+ * and will throw a CancellationException
+ */
+ public void setTimeout(Duration timeout) {
+ this.timeout = timeout;
+ }
+
+ /**
+ * Turns a Runnable into a Void Callable in order to submit it to the rule for execution
+ *
+ * @param runnable a Runnable to convert to a Callable
+ * @return a Callable with Void return type
+ */
+ public static Callable<Void> toCallable(Runnable runnable) {
+ return () -> {
+ runnable.run();
+ return null;
+ };
+ }
+
+ private void awaitFutures() {
+ for (Future<Void> future : futures) {
+ awaitFuture(future);
+ }
+
+ clearCompletedFutures();
+ }
+
+ private void awaitFuture(Future<Void> future) {
+ try {
+ future.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ errorCollector.addError(e.getCause());
+ } catch (Exception e) {
+ errorCollector.addError(e);
+ }
+ }
+
+ private static Object readField(final Class targetClass, final Object targetInstance,
+ final String fieldName) {
+ try {
+ Field field = targetClass.getDeclaredField(fieldName);
+ field.setAccessible(true);
+ return field.get(targetInstance);
+ } catch (IllegalAccessException | NoSuchFieldException e) {
+ throw new Error(e);
+ }
+ }
+
+ private void clearCompletedFutures() {
+ futures.removeIf(future -> future.isCancelled() || future.isDone());
+ }
+
+ public static class ConcurrentOperation<T> implements Callable<Void> {
+ private final int DEFAULT_ITERATIONS = 1;
+ private final Duration DEFAULT_DURATION = Duration.ofSeconds(300);
+
+ private Callable<T> callable;
+ private int iterations;
+ private Throwable expectedException;
+ private T expectedValue;
+ private Duration duration;
+ private Class expectedExceptionType;
+
+ public ConcurrentOperation() {
+ callable = null;
+ iterations = DEFAULT_ITERATIONS;
+ duration = DEFAULT_DURATION;
+ expectedException = null;
+ expectedExceptionType = null;
+ expectedValue = null;
+ }
+
+ public ConcurrentOperation(Callable<T> toAdd) {
+ this.callable = toAdd;
+ iterations = DEFAULT_ITERATIONS;
+ duration = DEFAULT_DURATION;
+ expectedException = null;
+ expectedExceptionType = null;
+ expectedValue = null;
+ }
+
+ /**
+ * Sets a callable to be repeated the given number of times. If there is also an expected result
+ * for the callable, that expectation must be met for each iteration of the callable.
+ *
+ * @param iterations the number of times to run the callable
+ * @return this, the ConcurrentOperation (containing a callable) that has been set to repeat
+ */
+ public ConcurrentOperation repeatForIterations(int iterations) {
+ if (!duration.equals(DEFAULT_DURATION)) {
+ throw new IllegalArgumentException("Specify only Duration or Iterations");
+ }
+
+ this.iterations = iterations;
+ return this;
+ }
+
+ /**
+ * Sets a callable to be repeated for some duration. If there is also an expected result
+ * for the callable, that expectation must be met for each iteration of the callable. The
+ * callable will not be restarted after the duration has been met, however the current
+ * iteration will be allowed to continue until the timeout is reached.
+ *
+ * @param duration, the Duration for which to repeat the callable
+ * @return this, the ConcurrentOperation (containing a callable) that has been set to repeat
+ */
+ public ConcurrentOperation repeatForDuration(Duration duration) {
+ if (iterations != DEFAULT_ITERATIONS) {
+ throw new IllegalArgumentException("Specify only Duration or Iterations");
+ }
+
+ this.duration = duration;
+ return this;
+ }
+
+ /**
+ * Sets the expected result of running the thread to be an exception matching the given
+ * exception
+ *
+ * @param expectedException the expected exception. If the message is null, the message of the
+ * thrown exception will not be checked, however if the message is empty, the thrown
+ * exception
+ * must also have a null or empty message.
+ * @return this, the ConcurrentOperation (containing a callable) that has been set to repeat
+ */
+ public ConcurrentOperation expectException(Throwable expectedException) {
+ if (expectedExceptionType != null || expectedValue != null) {
+ throw new IllegalArgumentException("Specify only one expected outcome.");
+ }
+
+ this.expectedException = expectedException;
+ return this;
+ }
+
+ /**
+ * Sets the expected result of running the thread to be an exception that is an instance of the
+ * given class
+ *
+ * @param expectedExceptionType the class of the expected exception. Causes will not be checked.
+ * @return this, the ConcurrentOperation (containing a callable) that has been set to repeat
+ */
+ public ConcurrentOperation expectExceptionType(Class expectedExceptionType) {
+ if (expectedException != null || expectedValue != null) {
+ throw new IllegalArgumentException("Specify only one expected outcome.");
+ }
+
+ this.expectedExceptionType = expectedExceptionType;
+ return this;
+ }
+
+ /**
+ * Sets the expected result of running the thread to be a value matching the given value
+ *
+ * @param expectedValue the value expected to be returned from the thread. The value must
+ * implement equals
+ * @return this, the ConcurrentOperation (containing a callable) that has been set to repeat
+ */
+ public ConcurrentOperation expectValue(T expectedValue) {
+ if (expectedExceptionType != null || expectedException != null) {
+ throw new IllegalArgumentException("Specify only one expected outcome.");
+ }
+
+ this.expectedValue = expectedValue;
+ return this;
+ }
+
+ @Override
+ public Void call() throws Exception {
+ Stopwatch timeRun = duration != DEFAULT_DURATION ? Stopwatch.createStarted() : null;
+ int numRuns = 0;
+
+ do {
+ numRuns++;
+ callAndValidate();
+ } while ((iterations != DEFAULT_ITERATIONS && numRuns < iterations) ||
+ (duration != DEFAULT_DURATION
+ && timeRun.elapsed(TimeUnit.SECONDS) <= duration.getSeconds()));
+ return null;
+ }
+
+ private void callAndValidate() throws Exception {
+ if (expectedValue != null) {
+ assertThat(this.callable.call()).isEqualTo(this.expectedValue);
+ } else if (expectedException != null) {
+ Throwable thrown = catchThrowable(() -> this.callable.call());
+ checkThrown(this.expectedException, thrown);
+ } else if (expectedExceptionType != null) {
+ Throwable thrown = catchThrowable(() -> this.callable.call());
+ assertThat(thrown).isInstanceOf(this.expectedExceptionType);
+ } else {
+ this.callable.call();
+ }
+ }
+
+ private void checkThrown(Throwable expected, Throwable actual) {
+ assertThat(actual).isInstanceOf(expected.getClass());
+
+ if (expected.getMessage() != null) {
+ assertThat(actual).hasMessage(expected.getMessage());
+ }
+
+ if (expected.getCause() != null) {
+ checkThrown(expected.getCause(), actual.getCause());
+ }
+ }
+ }
+
+ private static class ProtectedErrorCollector extends ErrorCollector {
+ @Override
+ protected void verify() {
+ try {
+ super.verify();
+ } catch (Error | RuntimeException e) {
+ throw e;
+ } catch (Throwable t) {
+ throw new RuntimeException(t);
+ }
+ }
+
+ List<Throwable> getErrors() {
+ return (List<Throwable>) readField(ErrorCollector.class, this, "errors");
+ }
+ }
+}
diff --git a/geode-junit/src/test/java/org/apache/geode/test/junit/rules/ConcurrencyRuleTest.java b/geode-junit/src/test/java/org/apache/geode/test/junit/rules/ConcurrencyRuleTest.java
new file mode 100644
index 0000000..282cdb5
--- /dev/null
+++ b/geode-junit/src/test/java/org/apache/geode/test/junit/rules/ConcurrencyRuleTest.java
@@ -0,0 +1,440 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geode.test.junit.rules;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assertions.catchThrowable;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+
+import junitparams.JUnitParamsRunner;
+import junitparams.Parameters;
+import org.awaitility.Awaitility;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.model.MultipleFailureException;
+
+@RunWith(JUnitParamsRunner.class)
+public class ConcurrencyRuleTest {
+ private final AtomicBoolean invoked = new AtomicBoolean();
+ private final AtomicInteger iterations = new AtomicInteger(0);
+
+ private final int stopIteration = 2;
+ private final Integer expectedRetVal = Integer.valueOf(72);
+ private final Throwable expectedException =
+ new IllegalStateException("Oh boy, here I go testin' again");
+ private final IllegalStateException expectedExceptionWithCause =
+ new IllegalStateException("Oh boy, here I go testin' again");
+ {
+ expectedExceptionWithCause.initCause(new NullPointerException());
+ }
+
+ private final Callable<Integer> callWithRetVal = () -> {
+ invoked.set(Boolean.TRUE);
+ return Integer.valueOf(72);
+ };
+
+ private final Callable<Integer> callWithRetValAndRepeatCount = () -> {
+ iterations.incrementAndGet();
+ return Integer.valueOf(72);
+ };
+
+ private final Callable<Integer> callWithRetValAndRepeatCountAndOneWrongValue = () -> {
+ int currentIteration = iterations.incrementAndGet();
+ if (currentIteration == stopIteration) {
+ return Integer.valueOf(3);
+ }
+ return Integer.valueOf(72);
+ };
+
+ private final Callable<Void> callWithExceptionAndRepeatCount = () -> {
+ iterations.incrementAndGet();
+ throw new IllegalStateException("Oh boy, here I go testin' again");
+ };
+
+ private final Callable<Void> callWithOneExceptionAndRepeatCount = () -> {
+ int currentIteration = iterations.incrementAndGet();
+ if (currentIteration == stopIteration) {
+ throw new IllegalStateException("Oh boy, here I go testin' again");
+ }
+ return null;
+ };
+
+ @Rule
+ public ConcurrencyRule concurrencyRule = new ConcurrencyRule();
+
+ @Before
+ public void setUp() {
+ invoked.set(false);
+ }
+
+ @Test
+ @Parameters({"EXECUTE_IN_SERIES", "EXECUTE_IN_PARALLEL"})
+ public void runAndExpectExceptionType(Execution execution) {
+ concurrencyRule.add(() -> {
+ throw new NullPointerException();
+ }).expectExceptionType(NullPointerException.class);
+ execution.execute(concurrencyRule);
+ }
+
+ @Test
+ @Parameters({"EXECUTE_IN_SERIES", "EXECUTE_IN_PARALLEL"})
+ public void runAndExpectException(Execution execution) {
+ Throwable expected = new NullPointerException("my custom message");
+ concurrencyRule.add(() -> {
+ throw new NullPointerException("my custom message");
+ }).expectException(expected);
+ execution.execute(concurrencyRule);
+ }
+
+ @Test
+ @Parameters({"EXECUTE_IN_SERIES", "EXECUTE_IN_PARALLEL"})
+ public void runAndExpectException_throwableInstanceWithCauses(Execution execution) {
+ Callable<?> callable = () -> {
+ NullPointerException cause = new NullPointerException();
+ IllegalStateException toThrow = new IllegalStateException("Oh boy, here I go testin' again");
+ toThrow.initCause(cause);
+ throw toThrow;
+ };
+
+ concurrencyRule.add(callable).expectException(expectedExceptionWithCause);
+ execution.execute(concurrencyRule);
+ }
+
+ @Test
+ @Parameters({"EXECUTE_IN_SERIES", "EXECUTE_IN_PARALLEL"})
+ public void runAndExpectException_throwableInstanceWithCauses_failsIfCauseDoesNotMatch(
+ Execution execution) {
+ Callable<Void> callable = () -> {
+ throw new IllegalStateException("Oh boy, here I go testin' again");
+ };
+
+ concurrencyRule.add(callable).expectException(expectedExceptionWithCause);
+
+ assertThatThrownBy(() -> execution.execute(concurrencyRule))
+ .isInstanceOf(AssertionError.class)
+ .hasMessageContaining("Expecting actual not to be null");
+ }
+
+ @Test
+ @Parameters({"EXECUTE_IN_SERIES", "EXECUTE_IN_PARALLEL"})
+ public void runAndExpectException_throwableInstance_wrongMessage_fails(Execution execution) {
+ Callable<?> callable = () -> {
+ throw new NullPointerException("foo");
+ };
+
+ concurrencyRule.add(callable).expectException(new NullPointerException("bar"));
+ assertThatThrownBy(() -> execution.execute(concurrencyRule))
+ .isInstanceOf(AssertionError.class);
+ }
+
+ @Test
+ @Parameters({"EXECUTE_IN_SERIES", "EXECUTE_IN_PARALLEL"})
+ public void runAndExpectException_throwableInstance_wrongClass_fails(Execution execution) {
+ Callable<?> callable = () -> {
+ throw new IllegalArgumentException("foo");
+ };
+
+ concurrencyRule.add(callable).expectException(new NullPointerException("foo"));
+ assertThatThrownBy(() -> execution.execute(concurrencyRule))
+ .isInstanceOf(AssertionError.class);
+ }
+
+ @Test
+ @Parameters({"EXECUTE_IN_SERIES", "EXECUTE_IN_PARALLEL"})
+ public void runAndExpectNoException_withNoReturn(Execution execution) {
+ concurrencyRule.add(ConcurrencyRule.toCallable(() -> invoked.set(true)));
+ execution.execute(concurrencyRule);
+ assertThat(invoked.get()).isTrue();
+ }
+
+ @Test
+ @Parameters({"EXECUTE_IN_SERIES", "EXECUTE_IN_PARALLEL"})
+ public void runAndExpectNoException_withReturn(Execution execution) {
+ concurrencyRule.add(() -> {
+ invoked.set(true);
+ return true;
+ });
+ execution.execute(concurrencyRule);
+ assertThat(invoked.get()).isTrue();
+ }
+
+ @Test
+ @Parameters({"EXECUTE_IN_SERIES", "EXECUTE_IN_PARALLEL"})
+ public void runAndExpectValue(Execution execution) {
+ concurrencyRule.add(callWithRetVal).expectValue(expectedRetVal);
+ execution.execute(concurrencyRule);
+ assertThat(invoked.get()).isTrue();
+ }
+
+ @Test
+ @Parameters({"EXECUTE_IN_SERIES", "EXECUTE_IN_PARALLEL"})
+ public void runAndExpectValue_failsForWrongValue(Execution execution) {
+ concurrencyRule.add(callWithRetVal).expectValue(Integer.valueOf(3));
+ assertThatThrownBy(() -> execution.execute(concurrencyRule))
+ .isInstanceOf(AssertionError.class);
+ assertThat(invoked.get()).isTrue();
+ }
+
+ @Test
+ @Parameters({"EXECUTE_IN_SERIES", "EXECUTE_IN_PARALLEL"})
+ public void repeatForIterations(Execution execution) {
+ int expectedIterations = 4;
+ this.iterations.set(0);
+
+ concurrencyRule.add(callWithRetValAndRepeatCount).repeatForIterations(4);
+ execution.execute(concurrencyRule);
+ assertThat(this.iterations.get()).isEqualTo(expectedIterations);
+ }
+
+ @Test
+ @Parameters({"EXECUTE_IN_SERIES", "EXECUTE_IN_PARALLEL"})
+ public void repeatForIterationsAndExpectExceptionForEach_byExceptionClass(Execution execution) {
+ int expectedIterations = 4;
+ this.iterations.set(0);
+
+ concurrencyRule.add(callWithExceptionAndRepeatCount)
+ .expectExceptionType(expectedException.getClass()).repeatForIterations(4);
+ execution.execute(concurrencyRule);
+ assertThat(this.iterations.get()).isEqualTo(expectedIterations);
+ }
+
+ @Test
+ @Parameters({"EXECUTE_IN_SERIES", "EXECUTE_IN_PARALLEL"})
+ public void repeatForIterationsAndExpectExceptionForEach_byExceptionInstance(
+ Execution execution) {
+ int expectedIteration = 4;
+ this.iterations.set(0);
+
+ concurrencyRule.add(callWithExceptionAndRepeatCount).expectException(expectedException)
+ .repeatForIterations(4);
+ execution.execute(concurrencyRule);
+ assertThat(this.iterations.get()).isEqualTo(expectedIteration);
+ }
+
+ @Test
+ @Parameters({"EXECUTE_IN_SERIES", "EXECUTE_IN_PARALLEL"})
+ public void repeatForIterationsAndExpectValueForEach(Execution execution) {
+ int ExpectedIterations = 4;
+ this.iterations.set(0);
+
+ concurrencyRule.add(callWithRetValAndRepeatCount).repeatForIterations(4)
+ .expectValue(expectedRetVal);
+ execution.execute(concurrencyRule);
+ assertThat(this.iterations.get()).isEqualTo(ExpectedIterations);
+ }
+
+ @Test
+ @Parameters({"EXECUTE_IN_SERIES", "EXECUTE_IN_PARALLEL"})
+ public void repeatForIterationsAndExpectValueForEach_failsWithOneWrongValue(Execution execution) {
+ int expectedIterations = 4;
+ this.iterations.set(0);
+
+ concurrencyRule.add(callWithRetValAndRepeatCountAndOneWrongValue).expectValue(expectedRetVal)
+ .repeatForIterations(expectedIterations);
+ assertThatThrownBy(() -> execution.execute(concurrencyRule)).isInstanceOf(AssertionError.class);
+ assertThat(this.iterations.get()).isEqualTo(stopIteration);
+ }
+
+ @Test
+ @Parameters({"EXECUTE_IN_SERIES", "EXECUTE_IN_PARALLEL"})
+ public void repeatForDuration(Execution execution) {
+ Duration duration = Duration.ofMillis(200);
+ this.iterations.set(0);
+
+ concurrencyRule.add(callWithRetValAndRepeatCount).repeatForDuration(duration);
+ Awaitility.await("Execution did not respect given duration").atMost(2, TimeUnit.MINUTES)
+ .until(() -> {
+ execution.execute(concurrencyRule);
+ return true;
+ });
+ assertThat(iterations.get()).isGreaterThan(1);
+ }
+
+ @Test
+ @Parameters({"EXECUTE_IN_SERIES", "EXECUTE_IN_PARALLEL"})
+ public void deadlocksGetResolved(Execution execution) {
+ final AtomicBoolean lock1 = new AtomicBoolean();
+ final AtomicBoolean lock2 = new AtomicBoolean();
+
+ concurrencyRule.add(() -> {
+ Awaitility.await().until(() -> lock2.equals(Boolean.TRUE));
+ lock1.set(true);
+ return null;
+ });
+
+ concurrencyRule.add(() -> {
+ Awaitility.await().until(() -> lock1.equals(Boolean.TRUE));
+ lock2.set(true);
+ return null;
+ });
+
+ concurrencyRule.setTimeout(Duration.ofSeconds(1));
+
+ Throwable thrown = catchThrowable(() -> execution.execute(concurrencyRule));
+ Throwable cause = thrown.getCause();
+
+ assertThat(thrown).isInstanceOf(RuntimeException.class);
+ assertThat(cause).isInstanceOf(MultipleFailureException.class);
+ assertThat(((MultipleFailureException) cause).getFailures())
+ .hasSize(2)
+ .hasOnlyElementsOfType(TimeoutException.class);
+ }
+
+ @Test
+ public void clearEmptiesThreadsToRun() {
+ final AtomicBoolean b1 = new AtomicBoolean(Boolean.FALSE);
+ final AtomicBoolean b2 = new AtomicBoolean(Boolean.FALSE);
+ final AtomicBoolean b3 = new AtomicBoolean(Boolean.FALSE);
+ final AtomicBoolean b4 = new AtomicBoolean(Boolean.FALSE);
+
+ Callable c1 = () -> {
+ b1.set(true);
+ return null;
+ };
+ Callable c2 = () -> {
+ b2.set(true);
+ return null;
+ };
+ Callable c3 = () -> {
+ b3.set(true);
+ return null;
+ };
+ Callable c4 = () -> {
+ b4.set(true);
+ return null;
+ };
+
+ // submit some threads and check they did what they're supposed to
+ concurrencyRule.add(c1);
+ concurrencyRule.add(c2);
+ concurrencyRule.add(c3).expectExceptionType(IllegalArgumentException.class);
+ Throwable thrown = catchThrowable(() -> concurrencyRule.executeInParallel());
+
+ assertThat(thrown).isInstanceOf(AssertionError.class);
+ assertThat(b1).isTrue();
+ assertThat(b2).isTrue();
+ assertThat(b3).isTrue();
+ assertThat(b4).isFalse();
+
+ // reset the booleans
+ b1.set(false);
+ b2.set(false);
+ b3.set(false);
+ b4.set(false);
+
+ // empty the list
+ concurrencyRule.clear();
+
+ // submit some more threads and check that ONLY those were executed
+ concurrencyRule.add(c3);
+ concurrencyRule.add(c4);
+
+ assertThat(catchThrowable(() -> concurrencyRule.executeInParallel())).isNull();
+ assertThat(b1).isFalse();
+ assertThat(b2).isFalse();
+ assertThat(b3).isTrue();
+ assertThat(b4).isTrue();
+ }
+
+ @Test
+ @Parameters({"EXECUTE_IN_SERIES", "EXECUTE_IN_PARALLEL"})
+ public void runManyThreads(Execution execution) {
+ Callable<Void> exceptionCallable = () -> {
+ throw new IOException("foo");
+ };
+
+ Callable<String> valueCallable = () -> {
+ return "successful value";
+ };
+
+ Callable<Void> setInvokedCallable = () -> {
+ invoked.set(true);
+ return null;
+ };
+
+ concurrencyRule.add(exceptionCallable).expectException(new NullPointerException("foo"));
+ concurrencyRule.add(exceptionCallable).expectException(new IOException("foo"));
+ concurrencyRule.add(valueCallable).expectValue("successful value");
+ concurrencyRule.add(valueCallable).expectValue("wrong value");
+ concurrencyRule.add(setInvokedCallable);
+ concurrencyRule.add(exceptionCallable);
+
+ Throwable thrown = catchThrowable(() -> execution.execute(concurrencyRule));
+ List<Throwable> errors = ((MultipleFailureException) thrown.getCause()).getFailures();
+
+ assertThat(errors).hasSize(3);
+ assertThat(errors.get(0)).isInstanceOf(AssertionError.class)
+ .hasMessageContaining(IOException.class.getName());
+ assertThat(errors.get(1)).isInstanceOf(AssertionError.class)
+ .hasMessageContaining("[successful] value")
+ .hasMessageContaining("[wrong] value");
+ assertThat(errors.get(2)).hasMessageContaining("foo")
+ .isInstanceOf(IOException.class);
+ }
+
+ @Test
+ @Parameters({"EXECUTE_IN_SERIES", "EXECUTE_IN_PARALLEL"})
+ public void timeoutValueIsRespected(Execution execution) {
+
+ Callable<Void> c1 = () -> {
+ Thread.sleep(5000);
+ return null;
+ };
+
+ concurrencyRule.setTimeout(Duration.ofSeconds(1));
+ concurrencyRule.add(c1);
+ concurrencyRule.add(c1);
+ Awaitility.await("timeout is respected").atMost(3, TimeUnit.SECONDS).until(() -> {
+ Throwable thrown = catchThrowable(() -> execution.execute(concurrencyRule));
+ assertThat(((MultipleFailureException) thrown.getCause()).getFailures()).hasSize(2)
+ .hasOnlyElementsOfType(TimeoutException.class);
+ return true;
+ });
+ }
+
+ @SuppressWarnings("unused")
+ private enum Execution {
+ EXECUTE_IN_SERIES(concurrencyRule -> {
+ concurrencyRule.executeInSeries();
+ }),
+ EXECUTE_IN_PARALLEL(concurrencyRule -> {
+ concurrencyRule.executeInParallel();
+ });
+
+ private final Consumer<ConcurrencyRule> execution;
+
+ Execution(Consumer<ConcurrencyRule> execution) {
+ this.execution = execution;
+ }
+
+ void execute(ConcurrencyRule concurrencyRule) {
+ execution.accept(concurrencyRule);
+ }
+ }
+}