You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by rc...@apache.org on 2019/12/02 03:01:07 UTC
[james-project] 04/05: JAMES-2990 ConcurrentTestRunner adopt
Publisher returned operation
This is an automated email from the ASF dual-hosted git repository.
rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 691006ec725624cab787e5a9cf21a4fb52f60d84
Author: Tran Tien Duc <dt...@linagora.com>
AuthorDate: Fri Nov 29 14:39:55 2019 +0700
JAMES-2990 ConcurrentTestRunner adopt Publisher<Void> returned operation
---
.../james/util/concurrency/ConcurrentTestRunner.java | 19 +++++++++++++++++++
.../jmap/api/preview/MessagePreviewStoreContract.java | 8 ++------
2 files changed, 21 insertions(+), 6 deletions(-)
diff --git a/server/container/util/src/main/java/org/apache/james/util/concurrency/ConcurrentTestRunner.java b/server/container/util/src/main/java/org/apache/james/util/concurrency/ConcurrentTestRunner.java
index 7ff1fbc..ed1e2bd 100644
--- a/server/container/util/src/main/java/org/apache/james/util/concurrency/ConcurrentTestRunner.java
+++ b/server/container/util/src/main/java/org/apache/james/util/concurrency/ConcurrentTestRunner.java
@@ -34,11 +34,14 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.james.util.concurrent.NamedThreadFactory;
+import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
+import reactor.core.publisher.Mono;
+
public class ConcurrentTestRunner implements Closeable {
public static final int DEFAULT_OPERATION_COUNT = 1;
@@ -46,6 +49,10 @@ public class ConcurrentTestRunner implements Closeable {
@FunctionalInterface
public interface RequireOperation {
RequireThreadCount operation(ConcurrentOperation operation);
+
+ default RequireThreadCount reactorOperation(ReactorOperation reactorOperation) {
+ return operation(reactorOperation.blocking());
+ }
}
@FunctionalInterface
@@ -97,10 +104,22 @@ public class ConcurrentTestRunner implements Closeable {
}
}
+ @FunctionalInterface
public interface ConcurrentOperation {
void execute(int threadNumber, int step) throws Exception;
}
+ @FunctionalInterface
+ public interface ReactorOperation {
+ Publisher<Void> execute(int threadNumber, int step) throws Exception;
+
+ default ConcurrentOperation blocking() {
+ return (threadNumber, step) -> Mono.from(execute(threadNumber, step))
+ .then()
+ .block();
+ }
+ }
+
private class ConcurrentRunnableTask implements Runnable {
private final int threadNumber;
private final ConcurrentOperation concurrentOperation;
diff --git a/server/data/data-jmap/src/test/java/org/apache/james/jmap/api/preview/MessagePreviewStoreContract.java b/server/data/data-jmap/src/test/java/org/apache/james/jmap/api/preview/MessagePreviewStoreContract.java
index 335c0e2..297e0b6 100644
--- a/server/data/data-jmap/src/test/java/org/apache/james/jmap/api/preview/MessagePreviewStoreContract.java
+++ b/server/data/data-jmap/src/test/java/org/apache/james/jmap/api/preview/MessagePreviewStoreContract.java
@@ -24,14 +24,12 @@ import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import java.time.Duration;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
import org.apache.james.mailbox.model.MessageId;
import org.apache.james.mailbox.model.TestMessageId;
import org.apache.james.util.concurrency.ConcurrentTestRunner;
import org.assertj.core.api.SoftAssertions;
-import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Mono;
@@ -126,9 +124,8 @@ public interface MessagePreviewStoreContract {
int stepCount = 100;
ConcurrentTestRunner.builder()
- .operation((thread, step) -> Mono.from(testee()
+ .reactorOperation((thread, step) -> testee()
.store(TestMessageId.of(thread), Preview.from(String.valueOf(step))))
- .block())
.threadCount(threadCount)
.operationCount(stepCount)
.runSuccessfullyWithin(Duration.ofMinutes(1));
@@ -146,9 +143,8 @@ public interface MessagePreviewStoreContract {
int operationCount = 100;
ConcurrentTestRunner.builder()
- .operation((thread, step) -> Mono.from(testee()
+ .reactorOperation((thread, step) -> testee()
.store(MESSAGE_ID_1, Preview.from(String.valueOf(step * threadCount + thread))))
- .block())
.threadCount(threadCount)
.operationCount(operationCount)
.runSuccessfullyWithin(Duration.ofMinutes(1));
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org