You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ro...@apache.org on 2018/12/14 08:25:45 UTC

[2/2] james-project git commit: JAMES-2629 Migrate CassandraAsyncExecutor to Reactor

JAMES-2629 Migrate CassandraAsyncExecutor to Reactor


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/629de6f3
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/629de6f3
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/629de6f3

Branch: refs/heads/master
Commit: 629de6f326d96d1420cbc9b5ead12956c31f8695
Parents: 3439628
Author: Gautier DI FOLCO <gd...@linagora.com>
Authored: Wed Dec 12 14:12:47 2018 +0100
Committer: Raphael Ouazana <ra...@linagora.com>
Committed: Fri Dec 14 09:25:24 2018 +0100

----------------------------------------------------------------------
 backends-common/cassandra/pom.xml               |  4 ++
 .../cassandra/utils/CassandraAsyncExecutor.java | 52 +++++++++++++++-----
 2 files changed, 45 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/629de6f3/backends-common/cassandra/pom.xml
----------------------------------------------------------------------
diff --git a/backends-common/cassandra/pom.xml b/backends-common/cassandra/pom.xml
index 9b8d1d8..6b35ab1 100644
--- a/backends-common/cassandra/pom.xml
+++ b/backends-common/cassandra/pom.xml
@@ -85,6 +85,10 @@
             <artifactId>reactor-core</artifactId>
         </dependency>
         <dependency>
+            <groupId>io.projectreactor</groupId>
+            <artifactId>reactor-core</artifactId>
+        </dependency>
+        <dependency>
             <groupId>javax.inject</groupId>
             <artifactId>javax.inject</artifactId>
         </dependency>

http://git-wip-us.apache.org/repos/asf/james-project/blob/629de6f3/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java
----------------------------------------------------------------------
diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java
index 7815643..f1084f7 100644
--- a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java
+++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java
@@ -30,6 +30,8 @@ import com.datastax.driver.core.Session;
 import com.datastax.driver.core.Statement;
 
 import net.javacrumbs.futureconverter.java8guava.FutureConverter;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
 
 public class CassandraAsyncExecutor {
 
@@ -41,29 +43,57 @@ public class CassandraAsyncExecutor {
     }
 
     public CompletableFuture<ResultSet> execute(Statement statement) {
-        return FutureConverter.toCompletableFuture(session.executeAsync(statement));
+        return executeReactor(statement).toFuture();
     }
 
 
     public CompletableFuture<Boolean> executeReturnApplied(Statement statement) {
-        return FutureConverter.toCompletableFuture(session.executeAsync(statement))
-            .thenApply(ResultSet::one)
-            .thenApply(row -> row.getBool(CassandraConstants.LIGHTWEIGHT_TRANSACTION_APPLIED));
+        return executeReturnAppliedReactor(statement).toFuture();
     }
 
     public CompletableFuture<Void> executeVoid(Statement statement) {
-        return FutureConverter.toCompletableFuture(session.executeAsync(statement))
-            .thenAccept(result -> { });
+        return executeVoidReactor(statement).toFuture();
     }
 
     public CompletableFuture<Optional<Row>> executeSingleRow(Statement statement) {
-        return execute(statement)
-            .thenApply(ResultSet::one)
-            .thenApply(Optional::ofNullable);
+        return executeSingleRowOptionalReactor(statement)
+                .toFuture();
     }
 
     public CompletableFuture<Boolean> executeReturnExists(Statement statement) {
-        return executeSingleRow(statement)
-            .thenApply(Optional::isPresent);
+        return executeReturnExistsReactor(statement).toFuture();
+    }
+
+    public Mono<ResultSet> executeReactor(Statement statement) {
+        return Mono.fromFuture(FutureConverter
+                .toCompletableFuture(session.executeAsync(statement)))
+                .publishOn(Schedulers.elastic());
+    }
+
+
+    public Mono<Boolean> executeReturnAppliedReactor(Statement statement) {
+        return executeReactor(statement)
+                .map(ResultSet::one)
+                .map(row -> row.getBool(CassandraConstants.LIGHTWEIGHT_TRANSACTION_APPLIED));
+    }
+
+    public Mono<Void> executeVoidReactor(Statement statement) {
+        return executeReactor(statement)
+                .then();
+    }
+
+    public Mono<Row> executeSingleRowReactor(Statement statement) {
+        return executeSingleRowOptionalReactor(statement)
+                .flatMap(Mono::justOrEmpty);
+    }
+
+    private Mono<Optional<Row>> executeSingleRowOptionalReactor(Statement statement) {
+        return executeReactor(statement)
+            .map(resultSet -> Optional.ofNullable(resultSet.one()));
+    }
+
+    public Mono<Boolean> executeReturnExistsReactor(Statement statement) {
+        return executeSingleRowReactor(statement)
+                .hasElement();
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org