You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by rc...@apache.org on 2020/01/14 09:00:09 UTC

[james-project] 01/02: JAMES-3025 introduce randomlyDistributed operations in ConcurrentTestRunner

This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 7bd038358341b4cabf70b4baa41f3dfc68cc6786
Author: Matthieu Baechler <ma...@apache.org>
AuthorDate: Thu Jan 9 11:45:41 2020 +0100

    JAMES-3025 introduce randomlyDistributed operations in ConcurrentTestRunner
---
 .../util/concurrency/ConcurrentTestRunner.java     | 27 ++++++-
 .../util/concurrency/ConcurrentTestRunnerTest.java | 88 ++++++++++++++++++++++
 2 files changed, 114 insertions(+), 1 deletion(-)

diff --git a/server/container/util/src/main/java/org/apache/james/util/concurrency/ConcurrentTestRunner.java b/server/container/util/src/main/java/org/apache/james/util/concurrency/ConcurrentTestRunner.java
index ed1e2bd..a47fe94 100644
--- a/server/container/util/src/main/java/org/apache/james/util/concurrency/ConcurrentTestRunner.java
+++ b/server/container/util/src/main/java/org/apache/james/util/concurrency/ConcurrentTestRunner.java
@@ -25,6 +25,7 @@ import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
+import java.util.Random;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -39,7 +40,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
-
 import reactor.core.publisher.Mono;
 
 public class ConcurrentTestRunner implements Closeable {
@@ -53,6 +53,31 @@ public class ConcurrentTestRunner implements Closeable {
         default RequireThreadCount reactorOperation(ReactorOperation reactorOperation) {
             return operation(reactorOperation.blocking());
         }
+
+        default RequireThreadCount randomlyDistributedOperations(ConcurrentOperation firstOperation, ConcurrentOperation... operations) {
+            Random random = createReproductibleRandom();
+            ConcurrentOperation aggregateOperation = (threadNumber, step) -> selectRandomOperation(random, firstOperation, operations).execute(threadNumber, step);
+            return operation(aggregateOperation);
+        }
+
+        default RequireThreadCount randomlyDistributedReactorOperations(ReactorOperation firstReactorOperation, ReactorOperation... reactorOperations) {
+            Random random = createReproductibleRandom();
+            ReactorOperation aggregateOperation = (threadNumber, step) -> selectRandomOperation(random, firstReactorOperation, reactorOperations).execute(threadNumber, step);
+            return reactorOperation(aggregateOperation);
+        }
+
+        default Random createReproductibleRandom() {
+            return new Random(2134);
+        }
+
+        default <OperationT> OperationT selectRandomOperation(Random random, OperationT firstReactorOperation, OperationT... reactorOperations) {
+            int whichAction = random.nextInt(reactorOperations.length + 1);
+            if (whichAction == 0) {
+                return firstReactorOperation;
+            } else {
+                return reactorOperations[whichAction - 1];
+            }
+        }
     }
 
     @FunctionalInterface
diff --git a/server/container/util/src/test/java/org/apache/james/util/concurrency/ConcurrentTestRunnerTest.java b/server/container/util/src/test/java/org/apache/james/util/concurrency/ConcurrentTestRunnerTest.java
index b40077e..39c1d20 100644
--- a/server/container/util/src/test/java/org/apache/james/util/concurrency/ConcurrentTestRunnerTest.java
+++ b/server/container/util/src/test/java/org/apache/james/util/concurrency/ConcurrentTestRunnerTest.java
@@ -26,12 +26,19 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import java.io.Closeable;
 import java.io.IOException;
 import java.time.Duration;
+import java.util.IntSummaryStatistics;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
 
 import org.junit.jupiter.api.Test;
 
+import reactor.core.publisher.Mono;
+
 class ConcurrentTestRunnerTest {
     private static final ConcurrentTestRunner.ConcurrentOperation NOOP = (threadNumber, step) -> { };
     private static final Duration DEFAULT_AWAIT_TIME = Duration.ofMillis(100);
@@ -231,4 +238,85 @@ class ConcurrentTestRunnerTest {
 
         assertThat(queue).containsOnly("0:0", "0:1", "1:0", "1:1");
     }
+
+    @Test
+    void runRandomlyDistributedOperationsShouldRunAllOperations() throws ExecutionException, InterruptedException {
+        AtomicBoolean firstOperationRun = new AtomicBoolean(false);
+        AtomicBoolean secondOperationRun = new AtomicBoolean(false);
+        AtomicBoolean thirdOperationRun = new AtomicBoolean(false);
+        ConcurrentTestRunner.builder()
+            .randomlyDistributedOperations(
+                (threadNumber, step) -> firstOperationRun.set(true),
+                (threadNumber, step) -> secondOperationRun.set(true),
+                (threadNumber, step) -> thirdOperationRun.set(true))
+            .threadCount(10)
+            .operationCount(10)
+            .runSuccessfullyWithin(Duration.ofMinutes(1));
+
+        assertThat(Stream.of(firstOperationRun, secondOperationRun, thirdOperationRun).map(AtomicBoolean::get)).containsOnly(true);
+    }
+
+    @Test
+    void runRandomlyDistributedOperationsShouldRunAllOperationsEvenly() throws ExecutionException, InterruptedException {
+        AtomicInteger firstOperationRuns = new AtomicInteger(0);
+        AtomicInteger secondOperationRuns = new AtomicInteger(0);
+        AtomicInteger thirdOperationRuns = new AtomicInteger(0);
+        int threadCount = 10;
+        int operationCount = 1000;
+        ConcurrentTestRunner.builder()
+            .randomlyDistributedOperations(
+                (threadNumber, step) -> firstOperationRuns.incrementAndGet(),
+                (threadNumber, step) -> secondOperationRuns.incrementAndGet(),
+                (threadNumber, step) -> thirdOperationRuns.incrementAndGet())
+            .threadCount(threadCount)
+            .operationCount(operationCount)
+            .runSuccessfullyWithin(Duration.ofMinutes(1));
+
+        IntSummaryStatistics statistics = IntStream.of(firstOperationRuns.get(), secondOperationRuns.get(), thirdOperationRuns.get()).summaryStatistics();
+        int min = statistics.getMin();
+        int max = statistics.getMax();
+
+        assertThat(max - min).isLessThan((threadCount * operationCount) * 5 / 100);
+    }
+
+    @Test
+    void runRandomlyDistributedReactorOperationsShouldRunAllOperations() throws ExecutionException, InterruptedException {
+        AtomicBoolean firstOperationRun = new AtomicBoolean(false);
+        AtomicBoolean secondOperationRun = new AtomicBoolean(false);
+        AtomicBoolean thirdOperationRun = new AtomicBoolean(false);
+        ConcurrentTestRunner.builder()
+            .randomlyDistributedReactorOperations(
+                (threadNumber, step) -> Mono.fromRunnable(() -> firstOperationRun.set(true)),
+                (threadNumber, step) -> Mono.fromRunnable(() -> secondOperationRun.set(true)),
+                (threadNumber, step) -> Mono.fromRunnable(() -> thirdOperationRun.set(true)))
+            .threadCount(10)
+            .operationCount(10)
+            .runSuccessfullyWithin(Duration.ofMinutes(1));
+
+        assertThat(Stream.of(firstOperationRun, secondOperationRun, thirdOperationRun).map(AtomicBoolean::get)).containsOnly(true);
+    }
+
+    @Test
+    void runRandomlyDistributedReactorOperationsShouldRunAllOperationsEvenly() throws ExecutionException, InterruptedException {
+        AtomicInteger firstOperationRuns = new AtomicInteger(0);
+        AtomicInteger secondOperationRuns = new AtomicInteger(0);
+        AtomicInteger thirdOperationRuns = new AtomicInteger(0);
+        int threadCount = 10;
+        int operationCount = 1000;
+        ConcurrentTestRunner.builder()
+            .randomlyDistributedReactorOperations(
+                (threadNumber, step) -> Mono.fromRunnable(firstOperationRuns::incrementAndGet),
+                (threadNumber, step) -> Mono.fromRunnable(secondOperationRuns::incrementAndGet),
+                (threadNumber, step) -> Mono.fromRunnable(thirdOperationRuns::incrementAndGet))
+            .threadCount(threadCount)
+            .operationCount(operationCount)
+            .runSuccessfullyWithin(Duration.ofMinutes(1));
+
+        IntSummaryStatistics statistics = IntStream.of(firstOperationRuns.get(), secondOperationRuns.get(), thirdOperationRuns.get()).summaryStatistics();
+        int min = statistics.getMin();
+        int max = statistics.getMax();
+
+        assertThat(max - min).isLessThan((threadCount * operationCount) * 5 / 100);
+    }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org