You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by rc...@apache.org on 2019/12/02 03:01:07 UTC

[james-project] 04/05: JAMES-2990 ConcurrentTestRunner adopt Publisher returned operation

This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 691006ec725624cab787e5a9cf21a4fb52f60d84
Author: Tran Tien Duc <dt...@linagora.com>
AuthorDate: Fri Nov 29 14:39:55 2019 +0700

    JAMES-2990 ConcurrentTestRunner adopt Publisher<Void> returned operation
---
 .../james/util/concurrency/ConcurrentTestRunner.java  | 19 +++++++++++++++++++
 .../jmap/api/preview/MessagePreviewStoreContract.java |  8 ++------
 2 files changed, 21 insertions(+), 6 deletions(-)

diff --git a/server/container/util/src/main/java/org/apache/james/util/concurrency/ConcurrentTestRunner.java b/server/container/util/src/main/java/org/apache/james/util/concurrency/ConcurrentTestRunner.java
index 7ff1fbc..ed1e2bd 100644
--- a/server/container/util/src/main/java/org/apache/james/util/concurrency/ConcurrentTestRunner.java
+++ b/server/container/util/src/main/java/org/apache/james/util/concurrency/ConcurrentTestRunner.java
@@ -34,11 +34,14 @@ import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.james.util.concurrent.NamedThreadFactory;
+import org.reactivestreams.Publisher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
 
+import reactor.core.publisher.Mono;
+
 public class ConcurrentTestRunner implements Closeable {
 
     public static final int DEFAULT_OPERATION_COUNT = 1;
@@ -46,6 +49,10 @@ public class ConcurrentTestRunner implements Closeable {
     @FunctionalInterface
     public interface RequireOperation {
         RequireThreadCount operation(ConcurrentOperation operation);
+
+        default RequireThreadCount reactorOperation(ReactorOperation reactorOperation) {
+            return operation(reactorOperation.blocking());
+        }
     }
 
     @FunctionalInterface
@@ -97,10 +104,22 @@ public class ConcurrentTestRunner implements Closeable {
         }
     }
 
+    @FunctionalInterface
     public interface ConcurrentOperation {
         void execute(int threadNumber, int step) throws Exception;
     }
 
+    @FunctionalInterface
+    public interface ReactorOperation {
+        Publisher<Void> execute(int threadNumber, int step) throws Exception;
+
+        default ConcurrentOperation blocking() {
+            return (threadNumber, step) -> Mono.from(execute(threadNumber, step))
+                .then()
+                .block();
+        }
+    }
+
     private class ConcurrentRunnableTask implements Runnable {
         private final int threadNumber;
         private final ConcurrentOperation concurrentOperation;
diff --git a/server/data/data-jmap/src/test/java/org/apache/james/jmap/api/preview/MessagePreviewStoreContract.java b/server/data/data-jmap/src/test/java/org/apache/james/jmap/api/preview/MessagePreviewStoreContract.java
index 335c0e2..297e0b6 100644
--- a/server/data/data-jmap/src/test/java/org/apache/james/jmap/api/preview/MessagePreviewStoreContract.java
+++ b/server/data/data-jmap/src/test/java/org/apache/james/jmap/api/preview/MessagePreviewStoreContract.java
@@ -24,14 +24,12 @@ import static org.assertj.core.api.Assertions.assertThatCode;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 import java.time.Duration;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.IntStream;
 
 import org.apache.james.mailbox.model.MessageId;
 import org.apache.james.mailbox.model.TestMessageId;
 import org.apache.james.util.concurrency.ConcurrentTestRunner;
 import org.assertj.core.api.SoftAssertions;
-import org.junit.jupiter.api.RepeatedTest;
 import org.junit.jupiter.api.Test;
 
 import reactor.core.publisher.Mono;
@@ -126,9 +124,8 @@ public interface MessagePreviewStoreContract {
         int stepCount = 100;
 
         ConcurrentTestRunner.builder()
-            .operation((thread, step) -> Mono.from(testee()
+            .reactorOperation((thread, step) -> testee()
                 .store(TestMessageId.of(thread), Preview.from(String.valueOf(step))))
-                .block())
             .threadCount(threadCount)
             .operationCount(stepCount)
             .runSuccessfullyWithin(Duration.ofMinutes(1));
@@ -146,9 +143,8 @@ public interface MessagePreviewStoreContract {
         int operationCount = 100;
 
         ConcurrentTestRunner.builder()
-            .operation((thread, step) -> Mono.from(testee()
+            .reactorOperation((thread, step) -> testee()
                 .store(MESSAGE_ID_1, Preview.from(String.valueOf(step * threadCount + thread))))
-                .block())
             .threadCount(threadCount)
             .operationCount(operationCount)
             .runSuccessfullyWithin(Duration.ofMinutes(1));


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org