You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ro...@apache.org on 2018/12/18 08:56:01 UTC

james-project git commit: JAMES-2623 port Runnables to Reactor to avoid using Common ForkJoin pool

Repository: james-project
Updated Branches:
  refs/heads/master d04e65506 -> db1c5dac5


JAMES-2623 port Runnables to Reactor to avoid using Common ForkJoin pool


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/db1c5dac
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/db1c5dac
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/db1c5dac

Branch: refs/heads/master
Commit: db1c5dac54e711f7d09e5ba38ce8824a5b51d015
Parents: d04e655
Author: Matthieu Baechler <ma...@apache.org>
Authored: Sat Nov 24 09:01:13 2018 +0100
Committer: Raphael Ouazana <ra...@linagora.com>
Committed: Tue Dec 18 09:55:37 2018 +0100

----------------------------------------------------------------------
 .../org/apache/james/CleanupTasksPerformer.java |  6 ++-
 .../apache/james/AggregateJunitExtension.java   | 21 ++++-----
 server/container/util/pom.xml                   |  4 ++
 .../java/org/apache/james/util/Runnables.java   | 34 +++++++-------
 .../org/apache/james/util/RunnablesTest.java    | 49 ++++++++++++++++++++
 5 files changed, 83 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/db1c5dac/server/container/guice/guice-common/src/main/java/org/apache/james/CleanupTasksPerformer.java
----------------------------------------------------------------------
diff --git a/server/container/guice/guice-common/src/main/java/org/apache/james/CleanupTasksPerformer.java b/server/container/guice/guice-common/src/main/java/org/apache/james/CleanupTasksPerformer.java
index 0fcbdb9..56772ac 100644
--- a/server/container/guice/guice-common/src/main/java/org/apache/james/CleanupTasksPerformer.java
+++ b/server/container/guice/guice-common/src/main/java/org/apache/james/CleanupTasksPerformer.java
@@ -26,6 +26,8 @@ import javax.inject.Inject;
 import org.apache.james.task.Task;
 import org.apache.james.util.Runnables;
 
+import reactor.core.publisher.Flux;
+
 public class CleanupTasksPerformer {
 
     public interface CleanupTask extends Task {
@@ -41,8 +43,8 @@ public class CleanupTasksPerformer {
 
     public void clean() {
         Runnables
-            .runParrallelStream(
-                cleanupTasks.stream()
+            .runParallel(
+                Flux.fromIterable(cleanupTasks)
                     .map(cleanupTask -> cleanupTask::run));
     }
 

http://git-wip-us.apache.org/repos/asf/james-project/blob/db1c5dac/server/container/guice/guice-common/src/test/java/org/apache/james/AggregateJunitExtension.java
----------------------------------------------------------------------
diff --git a/server/container/guice/guice-common/src/test/java/org/apache/james/AggregateJunitExtension.java b/server/container/guice/guice-common/src/test/java/org/apache/james/AggregateJunitExtension.java
index a8d867c..8cf131c 100644
--- a/server/container/guice/guice-common/src/test/java/org/apache/james/AggregateJunitExtension.java
+++ b/server/container/guice/guice-common/src/test/java/org/apache/james/AggregateJunitExtension.java
@@ -29,6 +29,7 @@ import org.junit.jupiter.api.extension.ParameterResolutionException;
 
 import com.github.fge.lambdas.Throwing;
 import com.google.common.collect.Lists;
+import reactor.core.publisher.Flux;
 
 public class AggregateJunitExtension implements RegistrableExtension {
 
@@ -40,30 +41,26 @@ public class AggregateJunitExtension implements RegistrableExtension {
 
     @Override
     public void beforeAll(ExtensionContext extensionContext) {
-        Runnables.runParrallelStream(registrableExtensions
-            .stream()
-            .map(ext -> Throwing.runnable(() -> ext.beforeAll(extensionContext))));
+        Runnables.runParallel(Flux.fromIterable(registrableExtensions)
+                    .map(ext -> Throwing.runnable(() -> ext.beforeAll(extensionContext))));
     }
 
     @Override
     public void beforeEach(ExtensionContext extensionContext) {
-        Runnables.runParrallelStream(registrableExtensions
-            .stream()
-            .map(ext -> Throwing.runnable(() -> ext.beforeEach(extensionContext))));
+        Runnables.runParallel(Flux.fromIterable(registrableExtensions)
+                    .map(ext -> Throwing.runnable(() -> ext.beforeEach(extensionContext))));
     }
 
     @Override
     public void afterEach(ExtensionContext extensionContext) {
-        Runnables.runParrallelStream(Lists.reverse(registrableExtensions)
-            .stream()
-            .map(ext -> Throwing.runnable(() -> ext.afterEach(extensionContext))));
+        Runnables.runParallel(Flux.fromIterable(Lists.reverse(registrableExtensions))
+                    .map(ext -> Throwing.runnable(() -> ext.afterEach(extensionContext))));
     }
 
     @Override
     public void afterAll(ExtensionContext extensionContext) {
-        Runnables.runParrallelStream(Lists.reverse(registrableExtensions)
-            .stream()
-            .map(ext -> Throwing.runnable(() -> ext.afterAll(extensionContext))));
+        Runnables.runParallel(Flux.fromIterable(Lists.reverse(registrableExtensions))
+                    .map(ext -> Throwing.runnable(() -> ext.afterAll(extensionContext))));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/james-project/blob/db1c5dac/server/container/util/pom.xml
----------------------------------------------------------------------
diff --git a/server/container/util/pom.xml b/server/container/util/pom.xml
index 47933e0..fa76097 100644
--- a/server/container/util/pom.xml
+++ b/server/container/util/pom.xml
@@ -71,6 +71,10 @@
             <artifactId>commons-io</artifactId>
         </dependency>
         <dependency>
+            <groupId>io.projectreactor</groupId>
+            <artifactId>reactor-core</artifactId>
+        </dependency>
+        <dependency>
             <groupId>javax.inject</groupId>
             <artifactId>javax.inject</artifactId>
         </dependency>

http://git-wip-us.apache.org/repos/asf/james-project/blob/db1c5dac/server/container/util/src/main/java/org/apache/james/util/Runnables.java
----------------------------------------------------------------------
diff --git a/server/container/util/src/main/java/org/apache/james/util/Runnables.java b/server/container/util/src/main/java/org/apache/james/util/Runnables.java
index 0d4bc91..b0f707e 100644
--- a/server/container/util/src/main/java/org/apache/james/util/Runnables.java
+++ b/server/container/util/src/main/java/org/apache/james/util/Runnables.java
@@ -19,27 +19,27 @@
 
 package org.apache.james.util;
 
-import java.util.Arrays;
-import java.util.concurrent.CompletableFuture;
-import java.util.function.Supplier;
-import java.util.stream.Stream;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
 
 public class Runnables {
     public static void runParallel(Runnable... runnables) {
-        Stream<Runnable> stream = Arrays.stream(runnables);
-        runParrallelStream(stream);
+        Flux<Runnable> stream = Flux.just(runnables);
+        runParallel(stream);
     }
 
-    public static void runParrallelStream(Stream<Runnable> stream) {
-        FluentFutureStream.of(stream
-                .map(runnable -> CompletableFuture.supplyAsync(toVoidSupplier(runnable))))
-            .join();
-    }
-
-    private static Supplier<Void> toVoidSupplier(Runnable runnable) {
-        return () -> {
-            runnable.run();
-            return null;
-        };
+    public static void runParallel(Flux<Runnable> runnables) {
+        runnables
+            .publishOn(Schedulers.elastic())
+            .parallel()
+            .runOn(Schedulers.elastic())
+            .flatMap(runnable -> {
+                runnable.run();
+                return Mono.empty();
+            })
+            .sequential()
+            .then()
+            .block();
     }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/db1c5dac/server/container/util/src/test/java/org/apache/james/util/RunnablesTest.java
----------------------------------------------------------------------
diff --git a/server/container/util/src/test/java/org/apache/james/util/RunnablesTest.java b/server/container/util/src/test/java/org/apache/james/util/RunnablesTest.java
new file mode 100644
index 0000000..e080dcc
--- /dev/null
+++ b/server/container/util/src/test/java/org/apache/james/util/RunnablesTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.james.util;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.junit.jupiter.api.Test;
+
+import reactor.core.publisher.Flux;
+
+class RunnablesTest {
+
+    @Test
+    void shouldActuallyRunThings() {
+        AtomicBoolean sideEffect = new AtomicBoolean(false);
+        Runnables.runParallel(() -> sideEffect.set(true));
+        assertThat(sideEffect).isTrue();
+    }
+
+    @Test
+    void shouldActuallyRunInParallel() throws InterruptedException {
+        int parallel = 2;
+        CountDownLatch countDownLatch = new CountDownLatch(parallel);
+        Runnable runnable = countDownLatch::countDown;
+        Runnables.runParallel(Flux.range(0, 2).map(i -> runnable));
+        assertThat(countDownLatch.await(2, TimeUnit.MINUTES)).isTrue();
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org