You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ro...@apache.org on 2018/12/18 08:56:01 UTC
james-project git commit: JAMES-2623 port Runnables to Reactor to
avoid using Common ForkJoin pool
Repository: james-project
Updated Branches:
refs/heads/master d04e65506 -> db1c5dac5
JAMES-2623 port Runnables to Reactor to avoid using Common ForkJoin pool
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/db1c5dac
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/db1c5dac
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/db1c5dac
Branch: refs/heads/master
Commit: db1c5dac54e711f7d09e5ba38ce8824a5b51d015
Parents: d04e655
Author: Matthieu Baechler <ma...@apache.org>
Authored: Sat Nov 24 09:01:13 2018 +0100
Committer: Raphael Ouazana <ra...@linagora.com>
Committed: Tue Dec 18 09:55:37 2018 +0100
----------------------------------------------------------------------
.../org/apache/james/CleanupTasksPerformer.java | 6 ++-
.../apache/james/AggregateJunitExtension.java | 21 ++++-----
server/container/util/pom.xml | 4 ++
.../java/org/apache/james/util/Runnables.java | 34 +++++++-------
.../org/apache/james/util/RunnablesTest.java | 49 ++++++++++++++++++++
5 files changed, 83 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/james-project/blob/db1c5dac/server/container/guice/guice-common/src/main/java/org/apache/james/CleanupTasksPerformer.java
----------------------------------------------------------------------
diff --git a/server/container/guice/guice-common/src/main/java/org/apache/james/CleanupTasksPerformer.java b/server/container/guice/guice-common/src/main/java/org/apache/james/CleanupTasksPerformer.java
index 0fcbdb9..56772ac 100644
--- a/server/container/guice/guice-common/src/main/java/org/apache/james/CleanupTasksPerformer.java
+++ b/server/container/guice/guice-common/src/main/java/org/apache/james/CleanupTasksPerformer.java
@@ -26,6 +26,8 @@ import javax.inject.Inject;
import org.apache.james.task.Task;
import org.apache.james.util.Runnables;
+import reactor.core.publisher.Flux;
+
public class CleanupTasksPerformer {
public interface CleanupTask extends Task {
@@ -41,8 +43,8 @@ public class CleanupTasksPerformer {
public void clean() {
Runnables
- .runParrallelStream(
- cleanupTasks.stream()
+ .runParallel(
+ Flux.fromIterable(cleanupTasks)
.map(cleanupTask -> cleanupTask::run));
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/db1c5dac/server/container/guice/guice-common/src/test/java/org/apache/james/AggregateJunitExtension.java
----------------------------------------------------------------------
diff --git a/server/container/guice/guice-common/src/test/java/org/apache/james/AggregateJunitExtension.java b/server/container/guice/guice-common/src/test/java/org/apache/james/AggregateJunitExtension.java
index a8d867c..8cf131c 100644
--- a/server/container/guice/guice-common/src/test/java/org/apache/james/AggregateJunitExtension.java
+++ b/server/container/guice/guice-common/src/test/java/org/apache/james/AggregateJunitExtension.java
@@ -29,6 +29,7 @@ import org.junit.jupiter.api.extension.ParameterResolutionException;
import com.github.fge.lambdas.Throwing;
import com.google.common.collect.Lists;
+import reactor.core.publisher.Flux;
public class AggregateJunitExtension implements RegistrableExtension {
@@ -40,30 +41,26 @@ public class AggregateJunitExtension implements RegistrableExtension {
@Override
public void beforeAll(ExtensionContext extensionContext) {
- Runnables.runParrallelStream(registrableExtensions
- .stream()
- .map(ext -> Throwing.runnable(() -> ext.beforeAll(extensionContext))));
+ Runnables.runParallel(Flux.fromIterable(registrableExtensions)
+ .map(ext -> Throwing.runnable(() -> ext.beforeAll(extensionContext))));
}
@Override
public void beforeEach(ExtensionContext extensionContext) {
- Runnables.runParrallelStream(registrableExtensions
- .stream()
- .map(ext -> Throwing.runnable(() -> ext.beforeEach(extensionContext))));
+ Runnables.runParallel(Flux.fromIterable(registrableExtensions)
+ .map(ext -> Throwing.runnable(() -> ext.beforeEach(extensionContext))));
}
@Override
public void afterEach(ExtensionContext extensionContext) {
- Runnables.runParrallelStream(Lists.reverse(registrableExtensions)
- .stream()
- .map(ext -> Throwing.runnable(() -> ext.afterEach(extensionContext))));
+ Runnables.runParallel(Flux.fromIterable(Lists.reverse(registrableExtensions))
+ .map(ext -> Throwing.runnable(() -> ext.afterEach(extensionContext))));
}
@Override
public void afterAll(ExtensionContext extensionContext) {
- Runnables.runParrallelStream(Lists.reverse(registrableExtensions)
- .stream()
- .map(ext -> Throwing.runnable(() -> ext.afterAll(extensionContext))));
+ Runnables.runParallel(Flux.fromIterable(Lists.reverse(registrableExtensions))
+ .map(ext -> Throwing.runnable(() -> ext.afterAll(extensionContext))));
}
@Override
http://git-wip-us.apache.org/repos/asf/james-project/blob/db1c5dac/server/container/util/pom.xml
----------------------------------------------------------------------
diff --git a/server/container/util/pom.xml b/server/container/util/pom.xml
index 47933e0..fa76097 100644
--- a/server/container/util/pom.xml
+++ b/server/container/util/pom.xml
@@ -71,6 +71,10 @@
<artifactId>commons-io</artifactId>
</dependency>
<dependency>
+ <groupId>io.projectreactor</groupId>
+ <artifactId>reactor-core</artifactId>
+ </dependency>
+ <dependency>
<groupId>javax.inject</groupId>
<artifactId>javax.inject</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/james-project/blob/db1c5dac/server/container/util/src/main/java/org/apache/james/util/Runnables.java
----------------------------------------------------------------------
diff --git a/server/container/util/src/main/java/org/apache/james/util/Runnables.java b/server/container/util/src/main/java/org/apache/james/util/Runnables.java
index 0d4bc91..b0f707e 100644
--- a/server/container/util/src/main/java/org/apache/james/util/Runnables.java
+++ b/server/container/util/src/main/java/org/apache/james/util/Runnables.java
@@ -19,27 +19,27 @@
package org.apache.james.util;
-import java.util.Arrays;
-import java.util.concurrent.CompletableFuture;
-import java.util.function.Supplier;
-import java.util.stream.Stream;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
public class Runnables {
public static void runParallel(Runnable... runnables) {
- Stream<Runnable> stream = Arrays.stream(runnables);
- runParrallelStream(stream);
+ Flux<Runnable> stream = Flux.just(runnables);
+ runParallel(stream);
}
- public static void runParrallelStream(Stream<Runnable> stream) {
- FluentFutureStream.of(stream
- .map(runnable -> CompletableFuture.supplyAsync(toVoidSupplier(runnable))))
- .join();
- }
-
- private static Supplier<Void> toVoidSupplier(Runnable runnable) {
- return () -> {
- runnable.run();
- return null;
- };
+ public static void runParallel(Flux<Runnable> runnables) {
+ runnables
+ .publishOn(Schedulers.elastic())
+ .parallel()
+ .runOn(Schedulers.elastic())
+ .flatMap(runnable -> {
+ runnable.run();
+ return Mono.empty();
+ })
+ .sequential()
+ .then()
+ .block();
}
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/db1c5dac/server/container/util/src/test/java/org/apache/james/util/RunnablesTest.java
----------------------------------------------------------------------
diff --git a/server/container/util/src/test/java/org/apache/james/util/RunnablesTest.java b/server/container/util/src/test/java/org/apache/james/util/RunnablesTest.java
new file mode 100644
index 0000000..e080dcc
--- /dev/null
+++ b/server/container/util/src/test/java/org/apache/james/util/RunnablesTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.james.util;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.junit.jupiter.api.Test;
+
+import reactor.core.publisher.Flux;
+
+class RunnablesTest {
+
+ @Test
+ void shouldActuallyRunThings() {
+ AtomicBoolean sideEffect = new AtomicBoolean(false);
+ Runnables.runParallel(() -> sideEffect.set(true));
+ assertThat(sideEffect).isTrue();
+ }
+
+ @Test
+ void shouldActuallyRunInParallel() throws InterruptedException {
+ int parallel = 2;
+ CountDownLatch countDownLatch = new CountDownLatch(parallel);
+ Runnable runnable = countDownLatch::countDown;
+ Runnables.runParallel(Flux.range(0, 2).map(i -> runnable));
+ assertThat(countDownLatch.await(2, TimeUnit.MINUTES)).isTrue();
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org