You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ro...@apache.org on 2018/12/14 08:25:45 UTC
[2/2] james-project git commit: JAMES-2629 Migrate
CassandraAsyncExecutor to Reactor
JAMES-2629 Migrate CassandraAsyncExecutor to Reactor
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/629de6f3
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/629de6f3
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/629de6f3
Branch: refs/heads/master
Commit: 629de6f326d96d1420cbc9b5ead12956c31f8695
Parents: 3439628
Author: Gautier DI FOLCO <gd...@linagora.com>
Authored: Wed Dec 12 14:12:47 2018 +0100
Committer: Raphael Ouazana <ra...@linagora.com>
Committed: Fri Dec 14 09:25:24 2018 +0100
----------------------------------------------------------------------
backends-common/cassandra/pom.xml | 4 ++
.../cassandra/utils/CassandraAsyncExecutor.java | 52 +++++++++++++++-----
2 files changed, 45 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/james-project/blob/629de6f3/backends-common/cassandra/pom.xml
----------------------------------------------------------------------
diff --git a/backends-common/cassandra/pom.xml b/backends-common/cassandra/pom.xml
index 9b8d1d8..6b35ab1 100644
--- a/backends-common/cassandra/pom.xml
+++ b/backends-common/cassandra/pom.xml
@@ -85,6 +85,10 @@
<artifactId>reactor-core</artifactId>
</dependency>
<dependency>
+ <groupId>io.projectreactor</groupId>
+ <artifactId>reactor-core</artifactId>
+ </dependency>
+ <dependency>
<groupId>javax.inject</groupId>
<artifactId>javax.inject</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/james-project/blob/629de6f3/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java
----------------------------------------------------------------------
diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java
index 7815643..f1084f7 100644
--- a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java
+++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java
@@ -30,6 +30,8 @@ import com.datastax.driver.core.Session;
import com.datastax.driver.core.Statement;
import net.javacrumbs.futureconverter.java8guava.FutureConverter;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
public class CassandraAsyncExecutor {
@@ -41,29 +43,57 @@ public class CassandraAsyncExecutor {
}
public CompletableFuture<ResultSet> execute(Statement statement) {
- return FutureConverter.toCompletableFuture(session.executeAsync(statement));
+ return executeReactor(statement).toFuture();
}
public CompletableFuture<Boolean> executeReturnApplied(Statement statement) {
- return FutureConverter.toCompletableFuture(session.executeAsync(statement))
- .thenApply(ResultSet::one)
- .thenApply(row -> row.getBool(CassandraConstants.LIGHTWEIGHT_TRANSACTION_APPLIED));
+ return executeReturnAppliedReactor(statement).toFuture();
}
public CompletableFuture<Void> executeVoid(Statement statement) {
- return FutureConverter.toCompletableFuture(session.executeAsync(statement))
- .thenAccept(result -> { });
+ return executeVoidReactor(statement).toFuture();
}
public CompletableFuture<Optional<Row>> executeSingleRow(Statement statement) {
- return execute(statement)
- .thenApply(ResultSet::one)
- .thenApply(Optional::ofNullable);
+ return executeSingleRowOptionalReactor(statement)
+ .toFuture();
}
public CompletableFuture<Boolean> executeReturnExists(Statement statement) {
- return executeSingleRow(statement)
- .thenApply(Optional::isPresent);
+ return executeReturnExistsReactor(statement).toFuture();
+ }
+
+ public Mono<ResultSet> executeReactor(Statement statement) {
+ return Mono.fromFuture(FutureConverter
+ .toCompletableFuture(session.executeAsync(statement)))
+ .publishOn(Schedulers.elastic());
+ }
+
+
+ public Mono<Boolean> executeReturnAppliedReactor(Statement statement) {
+ return executeReactor(statement)
+ .map(ResultSet::one)
+ .map(row -> row.getBool(CassandraConstants.LIGHTWEIGHT_TRANSACTION_APPLIED));
+ }
+
+ public Mono<Void> executeVoidReactor(Statement statement) {
+ return executeReactor(statement)
+ .then();
+ }
+
+ public Mono<Row> executeSingleRowReactor(Statement statement) {
+ return executeSingleRowOptionalReactor(statement)
+ .flatMap(Mono::justOrEmpty);
+ }
+
+ private Mono<Optional<Row>> executeSingleRowOptionalReactor(Statement statement) {
+ return executeReactor(statement)
+ .map(resultSet -> Optional.ofNullable(resultSet.one()));
+ }
+
+ public Mono<Boolean> executeReturnExistsReactor(Statement statement) {
+ return executeSingleRowReactor(statement)
+ .hasElement();
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org