You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ma...@apache.org on 2019/01/28 14:53:17 UTC

[02/12] james-project git commit: JAMES-2630 Migrate CassandraAsyncExecutor consumers to Reactor for event-sourcing-event-store-cassandra

JAMES-2630 Migrate CassandraAsyncExecutor consumers to Reactor for event-sourcing-event-store-cassandra


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/2133f0b1
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/2133f0b1
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/2133f0b1

Branch: refs/heads/master
Commit: 2133f0b1cd4362fda2f9541ccc02d4703cc3ddec
Parents: 9c96e60
Author: Gautier DI FOLCO <gd...@linagora.com>
Authored: Wed Dec 12 14:28:21 2018 +0100
Committer: Matthieu Baechler <ma...@apache.org>
Committed: Mon Jan 28 15:30:53 2019 +0100

----------------------------------------------------------------------
 .../eventstore/cassandra/CassandraEventStore.java              | 2 +-
 .../eventsourcing/eventstore/cassandra/EventStoreDao.java      | 6 +++---
 2 files changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/2133f0b1/event-sourcing/event-store-cassandra/src/main/java/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStore.java
----------------------------------------------------------------------
diff --git a/event-sourcing/event-store-cassandra/src/main/java/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStore.java b/event-sourcing/event-store-cassandra/src/main/java/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStore.java
index 7804de1..79f6646 100644
--- a/event-sourcing/event-store-cassandra/src/main/java/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStore.java
+++ b/event-sourcing/event-store-cassandra/src/main/java/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStore.java
@@ -51,7 +51,7 @@ public class CassandraEventStore implements EventStore {
     public void doAppendAll(List<Event> events) {
         Preconditions.checkArgument(Event.belongsToSameAggregate(events));
 
-        boolean success = eventStoreDao.appendAll(events).join();
+        boolean success = eventStoreDao.appendAll(events).block();
         if (!success) {
             throw new EventStoreFailedException("Concurrent update to the EventStore detected");
         }

http://git-wip-us.apache.org/repos/asf/james-project/blob/2133f0b1/event-sourcing/event-store-cassandra/src/main/java/org/apache/james/eventsourcing/eventstore/cassandra/EventStoreDao.java
----------------------------------------------------------------------
diff --git a/event-sourcing/event-store-cassandra/src/main/java/org/apache/james/eventsourcing/eventstore/cassandra/EventStoreDao.java b/event-sourcing/event-store-cassandra/src/main/java/org/apache/james/eventsourcing/eventstore/cassandra/EventStoreDao.java
index feb6ef7..e1e55c7 100644
--- a/event-sourcing/event-store-cassandra/src/main/java/org/apache/james/eventsourcing/eventstore/cassandra/EventStoreDao.java
+++ b/event-sourcing/event-store-cassandra/src/main/java/org/apache/james/eventsourcing/eventstore/cassandra/EventStoreDao.java
@@ -30,7 +30,6 @@ import static org.apache.james.eventsourcing.eventstore.cassandra.CassandraEvent
 
 import java.io.IOException;
 import java.util.List;
-import java.util.concurrent.CompletableFuture;
 
 import javax.inject.Inject;
 
@@ -48,6 +47,7 @@ import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Session;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.github.steveash.guavate.Guavate;
+import reactor.core.publisher.Mono;
 
 public class EventStoreDao {
     private final CassandraUtils cassandraUtils;
@@ -79,10 +79,10 @@ public class EventStoreDao {
             .where(eq(AGGREGATE_ID, bindMarker(AGGREGATE_ID))));
     }
 
-    public CompletableFuture<Boolean> appendAll(List<Event> events) {
+    public Mono<Boolean> appendAll(List<Event> events) {
         BatchStatement batch = new BatchStatement();
         events.forEach(event -> batch.add(insertEvent(event)));
-        return cassandraAsyncExecutor.executeReturnApplied(batch);
+        return cassandraAsyncExecutor.executeReturnAppliedReactor(batch);
     }
 
     private BoundStatement insertEvent(Event event) {


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org