You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2019/03/20 02:01:43 UTC

[james-project] 03/09: MAILBOX-373 Refactor CassandraEventDeadLetter

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit e51448c6c6455ce76bdb865357386190ce2407f9
Author: datph <dp...@linagora.com>
AuthorDate: Wed Mar 13 17:22:15 2019 +0700

    MAILBOX-373 Refactor CassandraEventDeadLetter
---
 .../mailbox/events/CassandraEventDeadLetters.java  | 21 ++++++-----
 .../events/CassandraEventDeadLettersDAO.java       | 44 ++++++++--------------
 .../events/CassandraEventDeadLettersModule.java    |  2 +-
 .../tables/CassandraEventDeadLettersTable.java     |  2 +-
 .../events/CassandraEventDeadLettersDAOTest.java   | 36 +++++++++---------
 5 files changed, 46 insertions(+), 59 deletions(-)

diff --git a/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/CassandraEventDeadLetters.java b/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/CassandraEventDeadLetters.java
index b155b3b..39eae1e 100644
--- a/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/CassandraEventDeadLetters.java
+++ b/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/CassandraEventDeadLetters.java
@@ -39,35 +39,36 @@ public class CassandraEventDeadLetters implements EventDeadLetters {
     }
 
     @Override
-    public Mono<Void> store(Group registeredGroup, Event failDeliveredEvent) {
+    public Mono<Void> store(Group registeredGroup, Event failDeliveredEvent, InsertionId insertionId) {
         Preconditions.checkArgument(registeredGroup != null, REGISTERED_GROUP_CANNOT_BE_NULL);
         Preconditions.checkArgument(failDeliveredEvent != null, FAIL_DELIVERED_EVENT_CANNOT_BE_NULL);
+        Preconditions.checkArgument(insertionId != null, FAIL_DELIVERED_ID_INSERTION_CANNOT_BE_NULL);
 
-        return cassandraEventDeadLettersDAO.store(registeredGroup, failDeliveredEvent)
+        return cassandraEventDeadLettersDAO.store(registeredGroup, failDeliveredEvent, insertionId)
             .then(cassandraEventDeadLettersGroupDAO.storeGroup(registeredGroup));
     }
 
     @Override
-    public Mono<Void> remove(Group registeredGroup, Event.EventId failDeliveredEventId) {
+    public Mono<Void> remove(Group registeredGroup, InsertionId failDeliveredInsertionId) {
         Preconditions.checkArgument(registeredGroup != null, REGISTERED_GROUP_CANNOT_BE_NULL);
-        Preconditions.checkArgument(failDeliveredEventId != null, FAIL_DELIVERED_ID_EVENT_CANNOT_BE_NULL);
+        Preconditions.checkArgument(failDeliveredInsertionId != null, FAIL_DELIVERED_ID_INSERTION_CANNOT_BE_NULL);
 
-        return cassandraEventDeadLettersDAO.removeEvent(registeredGroup, failDeliveredEventId);
+        return cassandraEventDeadLettersDAO.removeEvent(registeredGroup, failDeliveredInsertionId);
     }
 
     @Override
-    public Mono<Event> failedEvent(Group registeredGroup, Event.EventId failDeliveredEventId) {
+    public Mono<Event> failedEvent(Group registeredGroup, InsertionId failDeliveredInsertionId) {
         Preconditions.checkArgument(registeredGroup != null, REGISTERED_GROUP_CANNOT_BE_NULL);
-        Preconditions.checkArgument(failDeliveredEventId != null, FAIL_DELIVERED_ID_EVENT_CANNOT_BE_NULL);
+        Preconditions.checkArgument(failDeliveredInsertionId != null, FAIL_DELIVERED_ID_INSERTION_CANNOT_BE_NULL);
 
-        return cassandraEventDeadLettersDAO.retrieveFailedEvent(registeredGroup, failDeliveredEventId);
+        return cassandraEventDeadLettersDAO.retrieveFailedEvent(registeredGroup, failDeliveredInsertionId);
     }
 
     @Override
-    public Flux<Event.EventId> failedEventIds(Group registeredGroup) {
+    public Flux<InsertionId> failedIds(Group registeredGroup) {
         Preconditions.checkArgument(registeredGroup != null, REGISTERED_GROUP_CANNOT_BE_NULL);
 
-        return cassandraEventDeadLettersDAO.retrieveEventIdsWithGroup(registeredGroup);
+        return cassandraEventDeadLettersDAO.retrieveInsertionIdsWithGroup(registeredGroup);
     }
 
     @Override
diff --git a/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/CassandraEventDeadLettersDAO.java b/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/CassandraEventDeadLettersDAO.java
index 2056f55..0afe2ba 100644
--- a/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/CassandraEventDeadLettersDAO.java
+++ b/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/CassandraEventDeadLettersDAO.java
@@ -25,8 +25,8 @@ import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
 import static org.apache.james.mailbox.events.tables.CassandraEventDeadLettersTable.EVENT;
-import static org.apache.james.mailbox.events.tables.CassandraEventDeadLettersTable.EVENT_ID;
 import static org.apache.james.mailbox.events.tables.CassandraEventDeadLettersTable.GROUP;
+import static org.apache.james.mailbox.events.tables.CassandraEventDeadLettersTable.INSERTION_ID;
 import static org.apache.james.mailbox.events.tables.CassandraEventDeadLettersTable.TABLE_NAME;
 
 import javax.inject.Inject;
@@ -36,7 +36,6 @@ import org.apache.james.event.json.EventSerializer;
 
 import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.Session;
-import com.github.fge.lambdas.Throwing;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -46,7 +45,6 @@ public class CassandraEventDeadLettersDAO {
     private final EventSerializer eventSerializer;
     private final PreparedStatement insertStatement;
     private final PreparedStatement deleteStatement;
-    private final PreparedStatement selectAllGroupStatement;
     private final PreparedStatement selectEventStatement;
     private final PreparedStatement selectEventIdsWithGroupStatement;
 
@@ -56,15 +54,14 @@ public class CassandraEventDeadLettersDAO {
         this.eventSerializer = eventSerializer;
         this.insertStatement = prepareInsertStatement(session);
         this.deleteStatement = prepareDeleteStatement(session);
-        this.selectAllGroupStatement = prepareSelectAllGroupStatement(session);
         this.selectEventStatement = prepareSelectEventStatement(session);
-        this.selectEventIdsWithGroupStatement = prepareSelectEventIdsWithGroupStatement(session);
+        this.selectEventIdsWithGroupStatement = prepareSelectInsertionIdsWithGroupStatement(session);
     }
 
     private PreparedStatement prepareInsertStatement(Session session) {
         return session.prepare(insertInto(TABLE_NAME)
             .value(GROUP, bindMarker(GROUP))
-            .value(EVENT_ID, bindMarker(EVENT_ID))
+            .value(INSERTION_ID, bindMarker(INSERTION_ID))
             .value(EVENT, bindMarker(EVENT)));
     }
 
@@ -72,57 +69,46 @@ public class CassandraEventDeadLettersDAO {
         return session.prepare(delete()
             .from(TABLE_NAME)
             .where(eq(GROUP, bindMarker(GROUP)))
-            .and(eq(EVENT_ID, bindMarker(EVENT_ID))));
-    }
-
-    private PreparedStatement prepareSelectAllGroupStatement(Session session) {
-        return session.prepare(select(GROUP)
-            .from(TABLE_NAME));
+            .and(eq(INSERTION_ID, bindMarker(INSERTION_ID))));
     }
 
     private PreparedStatement prepareSelectEventStatement(Session session) {
         return session.prepare(select(EVENT)
             .from(TABLE_NAME)
             .where(eq(GROUP, bindMarker(GROUP)))
-            .and(eq(EVENT_ID, bindMarker(EVENT_ID))));
+            .and(eq(INSERTION_ID, bindMarker(INSERTION_ID))));
     }
 
-    private PreparedStatement prepareSelectEventIdsWithGroupStatement(Session session) {
-        return session.prepare(select(EVENT_ID)
+    private PreparedStatement prepareSelectInsertionIdsWithGroupStatement(Session session) {
+        return session.prepare(select(INSERTION_ID)
             .from(TABLE_NAME)
             .where(eq(GROUP, bindMarker(GROUP))));
     }
 
-    Mono<Void> store(Group group, Event failedEvent) {
+    Mono<Void> store(Group group, Event failedEvent, EventDeadLetters.InsertionId insertionId) {
         return executor.executeVoid(insertStatement.bind()
                 .setString(GROUP, group.asString())
-                .setUUID(EVENT_ID, failedEvent.getEventId().getId())
+                .setUUID(INSERTION_ID, insertionId.getId())
                 .setString(EVENT, eventSerializer.toJson(failedEvent)));
     }
 
-    Mono<Void> removeEvent(Group group, Event.EventId failedEventId) {
+    Mono<Void> removeEvent(Group group, EventDeadLetters.InsertionId failedInsertionId) {
         return executor.executeVoid(deleteStatement.bind()
                 .setString(GROUP, group.asString())
-                .setUUID(EVENT_ID, failedEventId.getId()));
+                .setUUID(INSERTION_ID, failedInsertionId.getId()));
     }
 
-    Mono<Event> retrieveFailedEvent(Group group, Event.EventId failedEventId) {
+    Mono<Event> retrieveFailedEvent(Group group, EventDeadLetters.InsertionId insertionId) {
         return executor.executeSingleRow(selectEventStatement.bind()
                 .setString(GROUP, group.asString())
-                .setUUID(EVENT_ID, failedEventId.getId()))
+                .setUUID(INSERTION_ID, insertionId.getId()))
             .map(row -> deserializeEvent(row.getString(EVENT)));
     }
 
-    Flux<Event.EventId> retrieveEventIdsWithGroup(Group group) {
+    Flux<EventDeadLetters.InsertionId> retrieveInsertionIdsWithGroup(Group group) {
         return executor.executeRows(selectEventIdsWithGroupStatement.bind()
                 .setString(GROUP, group.asString()))
-            .map(row -> Event.EventId.of(row.getUUID(EVENT_ID)));
-    }
-
-    Flux<Group> retrieveAllGroups() {
-        return executor.executeRows(selectAllGroupStatement.bind())
-            .map(Throwing.function(row -> Group.deserialize(row.getString(GROUP))))
-            .distinct();
+            .map(row -> EventDeadLetters.InsertionId.of(row.getUUID(INSERTION_ID)));
     }
 
     private Event deserializeEvent(String serializedEvent) {
diff --git a/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/CassandraEventDeadLettersModule.java b/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/CassandraEventDeadLettersModule.java
index 72e354c..6c4095c 100644
--- a/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/CassandraEventDeadLettersModule.java
+++ b/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/CassandraEventDeadLettersModule.java
@@ -36,7 +36,7 @@ public interface CassandraEventDeadLettersModule {
                 SchemaBuilder.rows(CassandraConstants.DEFAULT_CACHED_ROW_PER_PARTITION)))
         .statement(statement -> statement
             .addPartitionKey(CassandraEventDeadLettersTable.GROUP, DataType.text())
-            .addClusteringColumn(CassandraEventDeadLettersTable.EVENT_ID, DataType.uuid())
+            .addClusteringColumn(CassandraEventDeadLettersTable.INSERTION_ID, DataType.uuid())
             .addColumn(CassandraEventDeadLettersTable.EVENT, DataType.text()))
         .table(CassandraEventDeadLettersGroupTable.TABLE_NAME)
         .comment("Projection table for retrieving groups for all failed events")
diff --git a/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/tables/CassandraEventDeadLettersTable.java b/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/tables/CassandraEventDeadLettersTable.java
index 2418ffe..cbf272c 100644
--- a/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/tables/CassandraEventDeadLettersTable.java
+++ b/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/tables/CassandraEventDeadLettersTable.java
@@ -24,6 +24,6 @@ public interface CassandraEventDeadLettersTable {
     String TABLE_NAME = "event_dead_letters";
 
     String GROUP = "group";
-    String EVENT_ID = "eventId";
+    String INSERTION_ID = "insertionId";
     String EVENT = "event";
 }
diff --git a/mailbox/event/event-cassandra/src/test/java/org/apache/james/mailbox/events/CassandraEventDeadLettersDAOTest.java b/mailbox/event/event-cassandra/src/test/java/org/apache/james/mailbox/events/CassandraEventDeadLettersDAOTest.java
index e5da4df..2fba1d8 100644
--- a/mailbox/event/event-cassandra/src/test/java/org/apache/james/mailbox/events/CassandraEventDeadLettersDAOTest.java
+++ b/mailbox/event/event-cassandra/src/test/java/org/apache/james/mailbox/events/CassandraEventDeadLettersDAOTest.java
@@ -22,11 +22,11 @@ package org.apache.james.mailbox.events;
 import static org.apache.james.mailbox.events.EventDeadLettersContract.EVENT_1;
 import static org.apache.james.mailbox.events.EventDeadLettersContract.EVENT_2;
 import static org.apache.james.mailbox.events.EventDeadLettersContract.EVENT_3;
-import static org.apache.james.mailbox.events.EventDeadLettersContract.EVENT_ID_1;
-import static org.apache.james.mailbox.events.EventDeadLettersContract.EVENT_ID_2;
-import static org.apache.james.mailbox.events.EventDeadLettersContract.EVENT_ID_3;
 import static org.apache.james.mailbox.events.EventDeadLettersContract.GROUP_A;
 import static org.apache.james.mailbox.events.EventDeadLettersContract.GROUP_B;
+import static org.apache.james.mailbox.events.EventDeadLettersContract.INSERTION_ID_1;
+import static org.apache.james.mailbox.events.EventDeadLettersContract.INSERTION_ID_2;
+import static org.apache.james.mailbox.events.EventDeadLettersContract.INSERTION_ID_3;
 import static org.assertj.core.api.Assertions.assertThat;
 
 import org.apache.james.backends.cassandra.CassandraCluster;
@@ -53,12 +53,12 @@ class CassandraEventDeadLettersDAOTest {
 
     @Test
     void removeEventShouldSucceededWhenRemoveStoredEvent() {
-        cassandraEventDeadLettersDAO.store(GROUP_A, EVENT_1).block();
+        cassandraEventDeadLettersDAO.store(GROUP_A, EVENT_1, INSERTION_ID_1).block();
 
-        cassandraEventDeadLettersDAO.removeEvent(GROUP_A, EVENT_ID_1).block();
+        cassandraEventDeadLettersDAO.removeEvent(GROUP_A, INSERTION_ID_1).block();
 
         assertThat(cassandraEventDeadLettersDAO
-                .retrieveEventIdsWithGroup(GROUP_A)
+                .retrieveInsertionIdsWithGroup(GROUP_A)
                 .collectList().block())
             .isEmpty();
     }
@@ -66,39 +66,39 @@ class CassandraEventDeadLettersDAOTest {
     @Test
     void retrieveFailedEventShouldReturnEmptyWhenDefault() {
         assertThat(cassandraEventDeadLettersDAO
-                .retrieveFailedEvent(GROUP_A, EVENT_ID_1)
+                .retrieveFailedEvent(GROUP_A, INSERTION_ID_1)
                 .blockOptional().isPresent())
             .isFalse();
     }
 
     @Test
     void retrieveFailedEventShouldReturnStoredEvent() {
-        cassandraEventDeadLettersDAO.store(GROUP_A, EVENT_1).block();
-        cassandraEventDeadLettersDAO.store(GROUP_B, EVENT_2).block();
+        cassandraEventDeadLettersDAO.store(GROUP_A, EVENT_1, INSERTION_ID_1).block();
+        cassandraEventDeadLettersDAO.store(GROUP_B, EVENT_2, INSERTION_ID_2).block();
 
         assertThat(cassandraEventDeadLettersDAO
-                .retrieveFailedEvent(GROUP_B, EVENT_ID_2)
+                .retrieveFailedEvent(GROUP_B, INSERTION_ID_2)
                 .blockOptional().get())
             .isEqualTo(EVENT_2);
     }
 
     @Test
-    void retrieveEventIdsWithGroupShouldReturnEmptyWhenDefault() {
+    void retrieveInsertionIdsWithGroupShouldReturnEmptyWhenDefault() {
         assertThat(cassandraEventDeadLettersDAO
-                .retrieveEventIdsWithGroup(GROUP_A)
+                .retrieveInsertionIdsWithGroup(GROUP_A)
                 .collectList().block())
             .isEmpty();
     }
 
     @Test
-    void retrieveEventIdsWithGroupShouldReturnStoredEventId() {
-        cassandraEventDeadLettersDAO.store(GROUP_B, EVENT_1).block();
-        cassandraEventDeadLettersDAO.store(GROUP_B, EVENT_2).block();
-        cassandraEventDeadLettersDAO.store(GROUP_B, EVENT_3).block();
+    void retrieveInsertionIdsWithGroupShouldReturnStoredInsertionId() {
+        cassandraEventDeadLettersDAO.store(GROUP_B, EVENT_1, INSERTION_ID_1).block();
+        cassandraEventDeadLettersDAO.store(GROUP_B, EVENT_2, INSERTION_ID_2).block();
+        cassandraEventDeadLettersDAO.store(GROUP_B, EVENT_3, INSERTION_ID_3).block();
 
         assertThat(cassandraEventDeadLettersDAO
-                .retrieveEventIdsWithGroup(GROUP_B)
+                .retrieveInsertionIdsWithGroup(GROUP_B)
                 .collectList().block())
-            .containsOnly(EVENT_ID_1, EVENT_ID_2, EVENT_ID_3);
+            .containsOnly(INSERTION_ID_1, INSERTION_ID_2, INSERTION_ID_3);
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org