You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by rc...@apache.org on 2020/01/14 09:00:08 UTC

[james-project] branch master updated (e3c0b3f -> 350dd3c)

This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git.


    from e3c0b3f  [Refactoring] fix Reactor Intellij inspections
     new 7bd0383  JAMES-3025 introduce randomlyDistributed operations in ConcurrentTestRunner
     new 350dd3c  JAMES-3025 refactor LocalListenerRegistryTest to use randomlyDistributedOperations

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../mailbox/events/LocalListenerRegistryTest.java  | 25 +++---
 .../util/concurrency/ConcurrentTestRunner.java     | 27 ++++++-
 .../util/concurrency/ConcurrentTestRunnerTest.java | 88 ++++++++++++++++++++++
 3 files changed, 124 insertions(+), 16 deletions(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 02/02: JAMES-3025 refactor LocalListenerRegistryTest to use randomlyDistributedOperations

Posted by rc...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 350dd3cf212b174edbe4c86f6fca9f7a9dd4d2f6
Author: Matthieu Baechler <ma...@apache.org>
AuthorDate: Thu Jan 9 14:29:20 2020 +0100

    JAMES-3025 refactor LocalListenerRegistryTest to use randomlyDistributedOperations
---
 .../mailbox/events/LocalListenerRegistryTest.java  | 25 +++++++++-------------
 1 file changed, 10 insertions(+), 15 deletions(-)

diff --git a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/LocalListenerRegistryTest.java b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/LocalListenerRegistryTest.java
index 8136239..0726f1a 100644
--- a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/LocalListenerRegistryTest.java
+++ b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/LocalListenerRegistryTest.java
@@ -146,15 +146,10 @@ class LocalListenerRegistryTest {
             MailboxListener listener3 = event -> { };
 
             ConcurrentTestRunner.builder()
-                .operation((threadNumber, operationNumber) -> {
-                    if (threadNumber % 3 == 0) {
-                        testee.addListener(KEY_1, listener1);
-                    } else if (threadNumber % 3 == 1) {
-                        testee.addListener(KEY_1, listener2);
-                    } else if (threadNumber % 3 == 2) {
-                        testee.addListener(KEY_1, listener3);
-                    }
-                })
+                .randomlyDistributedOperations(
+                    (threadNumber, operationNumber) -> testee.addListener(KEY_1, listener1),
+                    (threadNumber, operationNumber) -> testee.addListener(KEY_1, listener2),
+                    (threadNumber, operationNumber) -> testee.addListener(KEY_1, listener3))
                 .threadCount(6)
                 .operationCount(10)
                 .runSuccessfullyWithin(ONE_SECOND);
@@ -188,24 +183,24 @@ class LocalListenerRegistryTest {
             AtomicInteger firstListenerCount = new AtomicInteger(0);
 
             ConcurrentTestRunner.builder()
-                .operation((threadNumber, operationNumber) -> {
-                    if (threadNumber % 3 == 0) {
+                .randomlyDistributedOperations((threadNumber, operationNumber) -> {
                         LocalListenerRegistry.LocalRegistration registration = testee.addListener(KEY_1, listener1);
                         if (registration.isFirstListener()) {
                             firstListenerCount.incrementAndGet();
                         }
-                    } else if (threadNumber % 3 == 1) {
+                    },
+                    (threadNumber, operationNumber) -> {
                         LocalListenerRegistry.LocalRegistration registration = testee.addListener(KEY_1, listener2);
                         if (registration.isFirstListener()) {
                             firstListenerCount.incrementAndGet();
                         }
-                    } else if (threadNumber % 3 == 2) {
+                    },
+                    (threadNumber, operationNumber) -> {
                         LocalListenerRegistry.LocalRegistration registration = testee.addListener(KEY_1, listener3);
                         if (registration.isFirstListener()) {
                             firstListenerCount.incrementAndGet();
                         }
-                    }
-                })
+                    })
                 .threadCount(6)
                 .operationCount(10)
                 .runSuccessfullyWithin(ONE_SECOND);


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 01/02: JAMES-3025 introduce randomlyDistributed operations in ConcurrentTestRunner

Posted by rc...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 7bd038358341b4cabf70b4baa41f3dfc68cc6786
Author: Matthieu Baechler <ma...@apache.org>
AuthorDate: Thu Jan 9 11:45:41 2020 +0100

    JAMES-3025 introduce randomlyDistributed operations in ConcurrentTestRunner
---
 .../util/concurrency/ConcurrentTestRunner.java     | 27 ++++++-
 .../util/concurrency/ConcurrentTestRunnerTest.java | 88 ++++++++++++++++++++++
 2 files changed, 114 insertions(+), 1 deletion(-)

diff --git a/server/container/util/src/main/java/org/apache/james/util/concurrency/ConcurrentTestRunner.java b/server/container/util/src/main/java/org/apache/james/util/concurrency/ConcurrentTestRunner.java
index ed1e2bd..a47fe94 100644
--- a/server/container/util/src/main/java/org/apache/james/util/concurrency/ConcurrentTestRunner.java
+++ b/server/container/util/src/main/java/org/apache/james/util/concurrency/ConcurrentTestRunner.java
@@ -25,6 +25,7 @@ import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
+import java.util.Random;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -39,7 +40,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
-
 import reactor.core.publisher.Mono;
 
 public class ConcurrentTestRunner implements Closeable {
@@ -53,6 +53,31 @@ public class ConcurrentTestRunner implements Closeable {
         default RequireThreadCount reactorOperation(ReactorOperation reactorOperation) {
             return operation(reactorOperation.blocking());
         }
+
+        default RequireThreadCount randomlyDistributedOperations(ConcurrentOperation firstOperation, ConcurrentOperation... operations) {
+            Random random = createReproductibleRandom();
+            ConcurrentOperation aggregateOperation = (threadNumber, step) -> selectRandomOperation(random, firstOperation, operations).execute(threadNumber, step);
+            return operation(aggregateOperation);
+        }
+
+        default RequireThreadCount randomlyDistributedReactorOperations(ReactorOperation firstReactorOperation, ReactorOperation... reactorOperations) {
+            Random random = createReproductibleRandom();
+            ReactorOperation aggregateOperation = (threadNumber, step) -> selectRandomOperation(random, firstReactorOperation, reactorOperations).execute(threadNumber, step);
+            return reactorOperation(aggregateOperation);
+        }
+
+        default Random createReproductibleRandom() {
+            return new Random(2134);
+        }
+
+        default <OperationT> OperationT selectRandomOperation(Random random, OperationT firstReactorOperation, OperationT... reactorOperations) {
+            int whichAction = random.nextInt(reactorOperations.length + 1);
+            if (whichAction == 0) {
+                return firstReactorOperation;
+            } else {
+                return reactorOperations[whichAction - 1];
+            }
+        }
     }
 
     @FunctionalInterface
diff --git a/server/container/util/src/test/java/org/apache/james/util/concurrency/ConcurrentTestRunnerTest.java b/server/container/util/src/test/java/org/apache/james/util/concurrency/ConcurrentTestRunnerTest.java
index b40077e..39c1d20 100644
--- a/server/container/util/src/test/java/org/apache/james/util/concurrency/ConcurrentTestRunnerTest.java
+++ b/server/container/util/src/test/java/org/apache/james/util/concurrency/ConcurrentTestRunnerTest.java
@@ -26,12 +26,19 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import java.io.Closeable;
 import java.io.IOException;
 import java.time.Duration;
+import java.util.IntSummaryStatistics;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
 
 import org.junit.jupiter.api.Test;
 
+import reactor.core.publisher.Mono;
+
 class ConcurrentTestRunnerTest {
     private static final ConcurrentTestRunner.ConcurrentOperation NOOP = (threadNumber, step) -> { };
     private static final Duration DEFAULT_AWAIT_TIME = Duration.ofMillis(100);
@@ -231,4 +238,85 @@ class ConcurrentTestRunnerTest {
 
         assertThat(queue).containsOnly("0:0", "0:1", "1:0", "1:1");
     }
+
+    @Test
+    void runRandomlyDistributedOperationsShouldRunAllOperations() throws ExecutionException, InterruptedException {
+        AtomicBoolean firstOperationRun = new AtomicBoolean(false);
+        AtomicBoolean secondOperationRun = new AtomicBoolean(false);
+        AtomicBoolean thirdOperationRun = new AtomicBoolean(false);
+        ConcurrentTestRunner.builder()
+            .randomlyDistributedOperations(
+                (threadNumber, step) -> firstOperationRun.set(true),
+                (threadNumber, step) -> secondOperationRun.set(true),
+                (threadNumber, step) -> thirdOperationRun.set(true))
+            .threadCount(10)
+            .operationCount(10)
+            .runSuccessfullyWithin(Duration.ofMinutes(1));
+
+        assertThat(Stream.of(firstOperationRun, secondOperationRun, thirdOperationRun).map(AtomicBoolean::get)).containsOnly(true);
+    }
+
+    @Test
+    void runRandomlyDistributedOperationsShouldRunAllOperationsEvenly() throws ExecutionException, InterruptedException {
+        AtomicInteger firstOperationRuns = new AtomicInteger(0);
+        AtomicInteger secondOperationRuns = new AtomicInteger(0);
+        AtomicInteger thirdOperationRuns = new AtomicInteger(0);
+        int threadCount = 10;
+        int operationCount = 1000;
+        ConcurrentTestRunner.builder()
+            .randomlyDistributedOperations(
+                (threadNumber, step) -> firstOperationRuns.incrementAndGet(),
+                (threadNumber, step) -> secondOperationRuns.incrementAndGet(),
+                (threadNumber, step) -> thirdOperationRuns.incrementAndGet())
+            .threadCount(threadCount)
+            .operationCount(operationCount)
+            .runSuccessfullyWithin(Duration.ofMinutes(1));
+
+        IntSummaryStatistics statistics = IntStream.of(firstOperationRuns.get(), secondOperationRuns.get(), thirdOperationRuns.get()).summaryStatistics();
+        int min = statistics.getMin();
+        int max = statistics.getMax();
+
+        assertThat(max - min).isLessThan((threadCount * operationCount) * 5 / 100);
+    }
+
+    @Test
+    void runRandomlyDistributedReactorOperationsShouldRunAllOperations() throws ExecutionException, InterruptedException {
+        AtomicBoolean firstOperationRun = new AtomicBoolean(false);
+        AtomicBoolean secondOperationRun = new AtomicBoolean(false);
+        AtomicBoolean thirdOperationRun = new AtomicBoolean(false);
+        ConcurrentTestRunner.builder()
+            .randomlyDistributedReactorOperations(
+                (threadNumber, step) -> Mono.fromRunnable(() -> firstOperationRun.set(true)),
+                (threadNumber, step) -> Mono.fromRunnable(() -> secondOperationRun.set(true)),
+                (threadNumber, step) -> Mono.fromRunnable(() -> thirdOperationRun.set(true)))
+            .threadCount(10)
+            .operationCount(10)
+            .runSuccessfullyWithin(Duration.ofMinutes(1));
+
+        assertThat(Stream.of(firstOperationRun, secondOperationRun, thirdOperationRun).map(AtomicBoolean::get)).containsOnly(true);
+    }
+
+    @Test
+    void runRandomlyDistributedReactorOperationsShouldRunAllOperationsEvenly() throws ExecutionException, InterruptedException {
+        AtomicInteger firstOperationRuns = new AtomicInteger(0);
+        AtomicInteger secondOperationRuns = new AtomicInteger(0);
+        AtomicInteger thirdOperationRuns = new AtomicInteger(0);
+        int threadCount = 10;
+        int operationCount = 1000;
+        ConcurrentTestRunner.builder()
+            .randomlyDistributedReactorOperations(
+                (threadNumber, step) -> Mono.fromRunnable(firstOperationRuns::incrementAndGet),
+                (threadNumber, step) -> Mono.fromRunnable(secondOperationRuns::incrementAndGet),
+                (threadNumber, step) -> Mono.fromRunnable(thirdOperationRuns::incrementAndGet))
+            .threadCount(threadCount)
+            .operationCount(operationCount)
+            .runSuccessfullyWithin(Duration.ofMinutes(1));
+
+        IntSummaryStatistics statistics = IntStream.of(firstOperationRuns.get(), secondOperationRuns.get(), thirdOperationRuns.get()).summaryStatistics();
+        int min = statistics.getMin();
+        int max = statistics.getMax();
+
+        assertThat(max - min).isLessThan((threadCount * operationCount) * 5 / 100);
+    }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org