You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ma...@apache.org on 2019/01/28 14:53:17 UTC
[02/12] james-project git commit: JAMES-2630 Migrate
CassandraAsyncExecutor consumers to Reactor for
event-sourcing-event-store-cassandra
JAMES-2630 Migrate CassandraAsyncExecutor consumers to Reactor for event-sourcing-event-store-cassandra
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/2133f0b1
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/2133f0b1
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/2133f0b1
Branch: refs/heads/master
Commit: 2133f0b1cd4362fda2f9541ccc02d4703cc3ddec
Parents: 9c96e60
Author: Gautier DI FOLCO <gd...@linagora.com>
Authored: Wed Dec 12 14:28:21 2018 +0100
Committer: Matthieu Baechler <ma...@apache.org>
Committed: Mon Jan 28 15:30:53 2019 +0100
----------------------------------------------------------------------
.../eventstore/cassandra/CassandraEventStore.java | 2 +-
.../eventsourcing/eventstore/cassandra/EventStoreDao.java | 6 +++---
2 files changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/james-project/blob/2133f0b1/event-sourcing/event-store-cassandra/src/main/java/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStore.java
----------------------------------------------------------------------
diff --git a/event-sourcing/event-store-cassandra/src/main/java/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStore.java b/event-sourcing/event-store-cassandra/src/main/java/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStore.java
index 7804de1..79f6646 100644
--- a/event-sourcing/event-store-cassandra/src/main/java/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStore.java
+++ b/event-sourcing/event-store-cassandra/src/main/java/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStore.java
@@ -51,7 +51,7 @@ public class CassandraEventStore implements EventStore {
public void doAppendAll(List<Event> events) {
Preconditions.checkArgument(Event.belongsToSameAggregate(events));
- boolean success = eventStoreDao.appendAll(events).join();
+ boolean success = eventStoreDao.appendAll(events).block();
if (!success) {
throw new EventStoreFailedException("Concurrent update to the EventStore detected");
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/2133f0b1/event-sourcing/event-store-cassandra/src/main/java/org/apache/james/eventsourcing/eventstore/cassandra/EventStoreDao.java
----------------------------------------------------------------------
diff --git a/event-sourcing/event-store-cassandra/src/main/java/org/apache/james/eventsourcing/eventstore/cassandra/EventStoreDao.java b/event-sourcing/event-store-cassandra/src/main/java/org/apache/james/eventsourcing/eventstore/cassandra/EventStoreDao.java
index feb6ef7..e1e55c7 100644
--- a/event-sourcing/event-store-cassandra/src/main/java/org/apache/james/eventsourcing/eventstore/cassandra/EventStoreDao.java
+++ b/event-sourcing/event-store-cassandra/src/main/java/org/apache/james/eventsourcing/eventstore/cassandra/EventStoreDao.java
@@ -30,7 +30,6 @@ import static org.apache.james.eventsourcing.eventstore.cassandra.CassandraEvent
import java.io.IOException;
import java.util.List;
-import java.util.concurrent.CompletableFuture;
import javax.inject.Inject;
@@ -48,6 +47,7 @@ import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.github.steveash.guavate.Guavate;
+import reactor.core.publisher.Mono;
public class EventStoreDao {
private final CassandraUtils cassandraUtils;
@@ -79,10 +79,10 @@ public class EventStoreDao {
.where(eq(AGGREGATE_ID, bindMarker(AGGREGATE_ID))));
}
- public CompletableFuture<Boolean> appendAll(List<Event> events) {
+ public Mono<Boolean> appendAll(List<Event> events) {
BatchStatement batch = new BatchStatement();
events.forEach(event -> batch.add(insertEvent(event)));
- return cassandraAsyncExecutor.executeReturnApplied(batch);
+ return cassandraAsyncExecutor.executeReturnAppliedReactor(batch);
}
private BoundStatement insertEvent(Event event) {
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org