You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by rc...@apache.org on 2020/01/14 09:00:09 UTC
[james-project] 01/02: JAMES-3025 introduce randomlyDistributed
operations in ConcurrentTestRunner
This is an automated email from the ASF dual-hosted git repository.
rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 7bd038358341b4cabf70b4baa41f3dfc68cc6786
Author: Matthieu Baechler <ma...@apache.org>
AuthorDate: Thu Jan 9 11:45:41 2020 +0100
JAMES-3025 introduce randomlyDistributed operations in ConcurrentTestRunner
---
.../util/concurrency/ConcurrentTestRunner.java | 27 ++++++-
.../util/concurrency/ConcurrentTestRunnerTest.java | 88 ++++++++++++++++++++++
2 files changed, 114 insertions(+), 1 deletion(-)
diff --git a/server/container/util/src/main/java/org/apache/james/util/concurrency/ConcurrentTestRunner.java b/server/container/util/src/main/java/org/apache/james/util/concurrency/ConcurrentTestRunner.java
index ed1e2bd..a47fe94 100644
--- a/server/container/util/src/main/java/org/apache/james/util/concurrency/ConcurrentTestRunner.java
+++ b/server/container/util/src/main/java/org/apache/james/util/concurrency/ConcurrentTestRunner.java
@@ -25,6 +25,7 @@ import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
+import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -39,7 +40,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
-
import reactor.core.publisher.Mono;
public class ConcurrentTestRunner implements Closeable {
@@ -53,6 +53,31 @@ public class ConcurrentTestRunner implements Closeable {
default RequireThreadCount reactorOperation(ReactorOperation reactorOperation) {
return operation(reactorOperation.blocking());
}
+
+ default RequireThreadCount randomlyDistributedOperations(ConcurrentOperation firstOperation, ConcurrentOperation... operations) {
+ Random random = createReproductibleRandom();
+ ConcurrentOperation aggregateOperation = (threadNumber, step) -> selectRandomOperation(random, firstOperation, operations).execute(threadNumber, step);
+ return operation(aggregateOperation);
+ }
+
+ default RequireThreadCount randomlyDistributedReactorOperations(ReactorOperation firstReactorOperation, ReactorOperation... reactorOperations) {
+ Random random = createReproductibleRandom();
+ ReactorOperation aggregateOperation = (threadNumber, step) -> selectRandomOperation(random, firstReactorOperation, reactorOperations).execute(threadNumber, step);
+ return reactorOperation(aggregateOperation);
+ }
+
+ default Random createReproductibleRandom() {
+ return new Random(2134);
+ }
+
+ default <OperationT> OperationT selectRandomOperation(Random random, OperationT firstReactorOperation, OperationT... reactorOperations) {
+ int whichAction = random.nextInt(reactorOperations.length + 1);
+ if (whichAction == 0) {
+ return firstReactorOperation;
+ } else {
+ return reactorOperations[whichAction - 1];
+ }
+ }
}
@FunctionalInterface
diff --git a/server/container/util/src/test/java/org/apache/james/util/concurrency/ConcurrentTestRunnerTest.java b/server/container/util/src/test/java/org/apache/james/util/concurrency/ConcurrentTestRunnerTest.java
index b40077e..39c1d20 100644
--- a/server/container/util/src/test/java/org/apache/james/util/concurrency/ConcurrentTestRunnerTest.java
+++ b/server/container/util/src/test/java/org/apache/james/util/concurrency/ConcurrentTestRunnerTest.java
@@ -26,12 +26,19 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy;
import java.io.Closeable;
import java.io.IOException;
import java.time.Duration;
+import java.util.IntSummaryStatistics;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
import org.junit.jupiter.api.Test;
+import reactor.core.publisher.Mono;
+
class ConcurrentTestRunnerTest {
private static final ConcurrentTestRunner.ConcurrentOperation NOOP = (threadNumber, step) -> { };
private static final Duration DEFAULT_AWAIT_TIME = Duration.ofMillis(100);
@@ -231,4 +238,85 @@ class ConcurrentTestRunnerTest {
assertThat(queue).containsOnly("0:0", "0:1", "1:0", "1:1");
}
+
+ @Test
+ void runRandomlyDistributedOperationsShouldRunAllOperations() throws ExecutionException, InterruptedException {
+ AtomicBoolean firstOperationRun = new AtomicBoolean(false);
+ AtomicBoolean secondOperationRun = new AtomicBoolean(false);
+ AtomicBoolean thirdOperationRun = new AtomicBoolean(false);
+ ConcurrentTestRunner.builder()
+ .randomlyDistributedOperations(
+ (threadNumber, step) -> firstOperationRun.set(true),
+ (threadNumber, step) -> secondOperationRun.set(true),
+ (threadNumber, step) -> thirdOperationRun.set(true))
+ .threadCount(10)
+ .operationCount(10)
+ .runSuccessfullyWithin(Duration.ofMinutes(1));
+
+ assertThat(Stream.of(firstOperationRun, secondOperationRun, thirdOperationRun).map(AtomicBoolean::get)).containsOnly(true);
+ }
+
+ @Test
+ void runRandomlyDistributedOperationsShouldRunAllOperationsEvenly() throws ExecutionException, InterruptedException {
+ AtomicInteger firstOperationRuns = new AtomicInteger(0);
+ AtomicInteger secondOperationRuns = new AtomicInteger(0);
+ AtomicInteger thirdOperationRuns = new AtomicInteger(0);
+ int threadCount = 10;
+ int operationCount = 1000;
+ ConcurrentTestRunner.builder()
+ .randomlyDistributedOperations(
+ (threadNumber, step) -> firstOperationRuns.incrementAndGet(),
+ (threadNumber, step) -> secondOperationRuns.incrementAndGet(),
+ (threadNumber, step) -> thirdOperationRuns.incrementAndGet())
+ .threadCount(threadCount)
+ .operationCount(operationCount)
+ .runSuccessfullyWithin(Duration.ofMinutes(1));
+
+ IntSummaryStatistics statistics = IntStream.of(firstOperationRuns.get(), secondOperationRuns.get(), thirdOperationRuns.get()).summaryStatistics();
+ int min = statistics.getMin();
+ int max = statistics.getMax();
+
+ assertThat(max - min).isLessThan((threadCount * operationCount) * 5 / 100);
+ }
+
+ @Test
+ void runRandomlyDistributedReactorOperationsShouldRunAllOperations() throws ExecutionException, InterruptedException {
+ AtomicBoolean firstOperationRun = new AtomicBoolean(false);
+ AtomicBoolean secondOperationRun = new AtomicBoolean(false);
+ AtomicBoolean thirdOperationRun = new AtomicBoolean(false);
+ ConcurrentTestRunner.builder()
+ .randomlyDistributedReactorOperations(
+ (threadNumber, step) -> Mono.fromRunnable(() -> firstOperationRun.set(true)),
+ (threadNumber, step) -> Mono.fromRunnable(() -> secondOperationRun.set(true)),
+ (threadNumber, step) -> Mono.fromRunnable(() -> thirdOperationRun.set(true)))
+ .threadCount(10)
+ .operationCount(10)
+ .runSuccessfullyWithin(Duration.ofMinutes(1));
+
+ assertThat(Stream.of(firstOperationRun, secondOperationRun, thirdOperationRun).map(AtomicBoolean::get)).containsOnly(true);
+ }
+
+ @Test
+ void runRandomlyDistributedReactorOperationsShouldRunAllOperationsEvenly() throws ExecutionException, InterruptedException {
+ AtomicInteger firstOperationRuns = new AtomicInteger(0);
+ AtomicInteger secondOperationRuns = new AtomicInteger(0);
+ AtomicInteger thirdOperationRuns = new AtomicInteger(0);
+ int threadCount = 10;
+ int operationCount = 1000;
+ ConcurrentTestRunner.builder()
+ .randomlyDistributedReactorOperations(
+ (threadNumber, step) -> Mono.fromRunnable(firstOperationRuns::incrementAndGet),
+ (threadNumber, step) -> Mono.fromRunnable(secondOperationRuns::incrementAndGet),
+ (threadNumber, step) -> Mono.fromRunnable(thirdOperationRuns::incrementAndGet))
+ .threadCount(threadCount)
+ .operationCount(operationCount)
+ .runSuccessfullyWithin(Duration.ofMinutes(1));
+
+ IntSummaryStatistics statistics = IntStream.of(firstOperationRuns.get(), secondOperationRuns.get(), thirdOperationRuns.get()).summaryStatistics();
+ int min = statistics.getMin();
+ int max = statistics.getMax();
+
+ assertThat(max - min).isLessThan((threadCount * operationCount) * 5 / 100);
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org