You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2019/03/20 02:01:43 UTC
[james-project] 03/09: MAILBOX-373 Refactor CassandraEventDeadLetter
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit e51448c6c6455ce76bdb865357386190ce2407f9
Author: datph <dp...@linagora.com>
AuthorDate: Wed Mar 13 17:22:15 2019 +0700
MAILBOX-373 Refactor CassandraEventDeadLetter
---
.../mailbox/events/CassandraEventDeadLetters.java | 21 ++++++-----
.../events/CassandraEventDeadLettersDAO.java | 44 ++++++++--------------
.../events/CassandraEventDeadLettersModule.java | 2 +-
.../tables/CassandraEventDeadLettersTable.java | 2 +-
.../events/CassandraEventDeadLettersDAOTest.java | 36 +++++++++---------
5 files changed, 46 insertions(+), 59 deletions(-)
diff --git a/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/CassandraEventDeadLetters.java b/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/CassandraEventDeadLetters.java
index b155b3b..39eae1e 100644
--- a/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/CassandraEventDeadLetters.java
+++ b/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/CassandraEventDeadLetters.java
@@ -39,35 +39,36 @@ public class CassandraEventDeadLetters implements EventDeadLetters {
}
@Override
- public Mono<Void> store(Group registeredGroup, Event failDeliveredEvent) {
+ public Mono<Void> store(Group registeredGroup, Event failDeliveredEvent, InsertionId insertionId) {
Preconditions.checkArgument(registeredGroup != null, REGISTERED_GROUP_CANNOT_BE_NULL);
Preconditions.checkArgument(failDeliveredEvent != null, FAIL_DELIVERED_EVENT_CANNOT_BE_NULL);
+ Preconditions.checkArgument(insertionId != null, FAIL_DELIVERED_ID_INSERTION_CANNOT_BE_NULL);
- return cassandraEventDeadLettersDAO.store(registeredGroup, failDeliveredEvent)
+ return cassandraEventDeadLettersDAO.store(registeredGroup, failDeliveredEvent, insertionId)
.then(cassandraEventDeadLettersGroupDAO.storeGroup(registeredGroup));
}
@Override
- public Mono<Void> remove(Group registeredGroup, Event.EventId failDeliveredEventId) {
+ public Mono<Void> remove(Group registeredGroup, InsertionId failDeliveredInsertionId) {
Preconditions.checkArgument(registeredGroup != null, REGISTERED_GROUP_CANNOT_BE_NULL);
- Preconditions.checkArgument(failDeliveredEventId != null, FAIL_DELIVERED_ID_EVENT_CANNOT_BE_NULL);
+ Preconditions.checkArgument(failDeliveredInsertionId != null, FAIL_DELIVERED_ID_INSERTION_CANNOT_BE_NULL);
- return cassandraEventDeadLettersDAO.removeEvent(registeredGroup, failDeliveredEventId);
+ return cassandraEventDeadLettersDAO.removeEvent(registeredGroup, failDeliveredInsertionId);
}
@Override
- public Mono<Event> failedEvent(Group registeredGroup, Event.EventId failDeliveredEventId) {
+ public Mono<Event> failedEvent(Group registeredGroup, InsertionId failDeliveredInsertionId) {
Preconditions.checkArgument(registeredGroup != null, REGISTERED_GROUP_CANNOT_BE_NULL);
- Preconditions.checkArgument(failDeliveredEventId != null, FAIL_DELIVERED_ID_EVENT_CANNOT_BE_NULL);
+ Preconditions.checkArgument(failDeliveredInsertionId != null, FAIL_DELIVERED_ID_INSERTION_CANNOT_BE_NULL);
- return cassandraEventDeadLettersDAO.retrieveFailedEvent(registeredGroup, failDeliveredEventId);
+ return cassandraEventDeadLettersDAO.retrieveFailedEvent(registeredGroup, failDeliveredInsertionId);
}
@Override
- public Flux<Event.EventId> failedEventIds(Group registeredGroup) {
+ public Flux<InsertionId> failedIds(Group registeredGroup) {
Preconditions.checkArgument(registeredGroup != null, REGISTERED_GROUP_CANNOT_BE_NULL);
- return cassandraEventDeadLettersDAO.retrieveEventIdsWithGroup(registeredGroup);
+ return cassandraEventDeadLettersDAO.retrieveInsertionIdsWithGroup(registeredGroup);
}
@Override
diff --git a/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/CassandraEventDeadLettersDAO.java b/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/CassandraEventDeadLettersDAO.java
index 2056f55..0afe2ba 100644
--- a/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/CassandraEventDeadLettersDAO.java
+++ b/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/CassandraEventDeadLettersDAO.java
@@ -25,8 +25,8 @@ import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto;
import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
import static org.apache.james.mailbox.events.tables.CassandraEventDeadLettersTable.EVENT;
-import static org.apache.james.mailbox.events.tables.CassandraEventDeadLettersTable.EVENT_ID;
import static org.apache.james.mailbox.events.tables.CassandraEventDeadLettersTable.GROUP;
+import static org.apache.james.mailbox.events.tables.CassandraEventDeadLettersTable.INSERTION_ID;
import static org.apache.james.mailbox.events.tables.CassandraEventDeadLettersTable.TABLE_NAME;
import javax.inject.Inject;
@@ -36,7 +36,6 @@ import org.apache.james.event.json.EventSerializer;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Session;
-import com.github.fge.lambdas.Throwing;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -46,7 +45,6 @@ public class CassandraEventDeadLettersDAO {
private final EventSerializer eventSerializer;
private final PreparedStatement insertStatement;
private final PreparedStatement deleteStatement;
- private final PreparedStatement selectAllGroupStatement;
private final PreparedStatement selectEventStatement;
private final PreparedStatement selectEventIdsWithGroupStatement;
@@ -56,15 +54,14 @@ public class CassandraEventDeadLettersDAO {
this.eventSerializer = eventSerializer;
this.insertStatement = prepareInsertStatement(session);
this.deleteStatement = prepareDeleteStatement(session);
- this.selectAllGroupStatement = prepareSelectAllGroupStatement(session);
this.selectEventStatement = prepareSelectEventStatement(session);
- this.selectEventIdsWithGroupStatement = prepareSelectEventIdsWithGroupStatement(session);
+ this.selectEventIdsWithGroupStatement = prepareSelectInsertionIdsWithGroupStatement(session);
}
private PreparedStatement prepareInsertStatement(Session session) {
return session.prepare(insertInto(TABLE_NAME)
.value(GROUP, bindMarker(GROUP))
- .value(EVENT_ID, bindMarker(EVENT_ID))
+ .value(INSERTION_ID, bindMarker(INSERTION_ID))
.value(EVENT, bindMarker(EVENT)));
}
@@ -72,57 +69,46 @@ public class CassandraEventDeadLettersDAO {
return session.prepare(delete()
.from(TABLE_NAME)
.where(eq(GROUP, bindMarker(GROUP)))
- .and(eq(EVENT_ID, bindMarker(EVENT_ID))));
- }
-
- private PreparedStatement prepareSelectAllGroupStatement(Session session) {
- return session.prepare(select(GROUP)
- .from(TABLE_NAME));
+ .and(eq(INSERTION_ID, bindMarker(INSERTION_ID))));
}
private PreparedStatement prepareSelectEventStatement(Session session) {
return session.prepare(select(EVENT)
.from(TABLE_NAME)
.where(eq(GROUP, bindMarker(GROUP)))
- .and(eq(EVENT_ID, bindMarker(EVENT_ID))));
+ .and(eq(INSERTION_ID, bindMarker(INSERTION_ID))));
}
- private PreparedStatement prepareSelectEventIdsWithGroupStatement(Session session) {
- return session.prepare(select(EVENT_ID)
+ private PreparedStatement prepareSelectInsertionIdsWithGroupStatement(Session session) {
+ return session.prepare(select(INSERTION_ID)
.from(TABLE_NAME)
.where(eq(GROUP, bindMarker(GROUP))));
}
- Mono<Void> store(Group group, Event failedEvent) {
+ Mono<Void> store(Group group, Event failedEvent, EventDeadLetters.InsertionId insertionId) {
return executor.executeVoid(insertStatement.bind()
.setString(GROUP, group.asString())
- .setUUID(EVENT_ID, failedEvent.getEventId().getId())
+ .setUUID(INSERTION_ID, insertionId.getId())
.setString(EVENT, eventSerializer.toJson(failedEvent)));
}
- Mono<Void> removeEvent(Group group, Event.EventId failedEventId) {
+ Mono<Void> removeEvent(Group group, EventDeadLetters.InsertionId failedInsertionId) {
return executor.executeVoid(deleteStatement.bind()
.setString(GROUP, group.asString())
- .setUUID(EVENT_ID, failedEventId.getId()));
+ .setUUID(INSERTION_ID, failedInsertionId.getId()));
}
- Mono<Event> retrieveFailedEvent(Group group, Event.EventId failedEventId) {
+ Mono<Event> retrieveFailedEvent(Group group, EventDeadLetters.InsertionId insertionId) {
return executor.executeSingleRow(selectEventStatement.bind()
.setString(GROUP, group.asString())
- .setUUID(EVENT_ID, failedEventId.getId()))
+ .setUUID(INSERTION_ID, insertionId.getId()))
.map(row -> deserializeEvent(row.getString(EVENT)));
}
- Flux<Event.EventId> retrieveEventIdsWithGroup(Group group) {
+ Flux<EventDeadLetters.InsertionId> retrieveInsertionIdsWithGroup(Group group) {
return executor.executeRows(selectEventIdsWithGroupStatement.bind()
.setString(GROUP, group.asString()))
- .map(row -> Event.EventId.of(row.getUUID(EVENT_ID)));
- }
-
- Flux<Group> retrieveAllGroups() {
- return executor.executeRows(selectAllGroupStatement.bind())
- .map(Throwing.function(row -> Group.deserialize(row.getString(GROUP))))
- .distinct();
+ .map(row -> EventDeadLetters.InsertionId.of(row.getUUID(INSERTION_ID)));
}
private Event deserializeEvent(String serializedEvent) {
diff --git a/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/CassandraEventDeadLettersModule.java b/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/CassandraEventDeadLettersModule.java
index 72e354c..6c4095c 100644
--- a/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/CassandraEventDeadLettersModule.java
+++ b/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/CassandraEventDeadLettersModule.java
@@ -36,7 +36,7 @@ public interface CassandraEventDeadLettersModule {
SchemaBuilder.rows(CassandraConstants.DEFAULT_CACHED_ROW_PER_PARTITION)))
.statement(statement -> statement
.addPartitionKey(CassandraEventDeadLettersTable.GROUP, DataType.text())
- .addClusteringColumn(CassandraEventDeadLettersTable.EVENT_ID, DataType.uuid())
+ .addClusteringColumn(CassandraEventDeadLettersTable.INSERTION_ID, DataType.uuid())
.addColumn(CassandraEventDeadLettersTable.EVENT, DataType.text()))
.table(CassandraEventDeadLettersGroupTable.TABLE_NAME)
.comment("Projection table for retrieving groups for all failed events")
diff --git a/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/tables/CassandraEventDeadLettersTable.java b/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/tables/CassandraEventDeadLettersTable.java
index 2418ffe..cbf272c 100644
--- a/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/tables/CassandraEventDeadLettersTable.java
+++ b/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/tables/CassandraEventDeadLettersTable.java
@@ -24,6 +24,6 @@ public interface CassandraEventDeadLettersTable {
String TABLE_NAME = "event_dead_letters";
String GROUP = "group";
- String EVENT_ID = "eventId";
+ String INSERTION_ID = "insertionId";
String EVENT = "event";
}
diff --git a/mailbox/event/event-cassandra/src/test/java/org/apache/james/mailbox/events/CassandraEventDeadLettersDAOTest.java b/mailbox/event/event-cassandra/src/test/java/org/apache/james/mailbox/events/CassandraEventDeadLettersDAOTest.java
index e5da4df..2fba1d8 100644
--- a/mailbox/event/event-cassandra/src/test/java/org/apache/james/mailbox/events/CassandraEventDeadLettersDAOTest.java
+++ b/mailbox/event/event-cassandra/src/test/java/org/apache/james/mailbox/events/CassandraEventDeadLettersDAOTest.java
@@ -22,11 +22,11 @@ package org.apache.james.mailbox.events;
import static org.apache.james.mailbox.events.EventDeadLettersContract.EVENT_1;
import static org.apache.james.mailbox.events.EventDeadLettersContract.EVENT_2;
import static org.apache.james.mailbox.events.EventDeadLettersContract.EVENT_3;
-import static org.apache.james.mailbox.events.EventDeadLettersContract.EVENT_ID_1;
-import static org.apache.james.mailbox.events.EventDeadLettersContract.EVENT_ID_2;
-import static org.apache.james.mailbox.events.EventDeadLettersContract.EVENT_ID_3;
import static org.apache.james.mailbox.events.EventDeadLettersContract.GROUP_A;
import static org.apache.james.mailbox.events.EventDeadLettersContract.GROUP_B;
+import static org.apache.james.mailbox.events.EventDeadLettersContract.INSERTION_ID_1;
+import static org.apache.james.mailbox.events.EventDeadLettersContract.INSERTION_ID_2;
+import static org.apache.james.mailbox.events.EventDeadLettersContract.INSERTION_ID_3;
import static org.assertj.core.api.Assertions.assertThat;
import org.apache.james.backends.cassandra.CassandraCluster;
@@ -53,12 +53,12 @@ class CassandraEventDeadLettersDAOTest {
@Test
void removeEventShouldSucceededWhenRemoveStoredEvent() {
- cassandraEventDeadLettersDAO.store(GROUP_A, EVENT_1).block();
+ cassandraEventDeadLettersDAO.store(GROUP_A, EVENT_1, INSERTION_ID_1).block();
- cassandraEventDeadLettersDAO.removeEvent(GROUP_A, EVENT_ID_1).block();
+ cassandraEventDeadLettersDAO.removeEvent(GROUP_A, INSERTION_ID_1).block();
assertThat(cassandraEventDeadLettersDAO
- .retrieveEventIdsWithGroup(GROUP_A)
+ .retrieveInsertionIdsWithGroup(GROUP_A)
.collectList().block())
.isEmpty();
}
@@ -66,39 +66,39 @@ class CassandraEventDeadLettersDAOTest {
@Test
void retrieveFailedEventShouldReturnEmptyWhenDefault() {
assertThat(cassandraEventDeadLettersDAO
- .retrieveFailedEvent(GROUP_A, EVENT_ID_1)
+ .retrieveFailedEvent(GROUP_A, INSERTION_ID_1)
.blockOptional().isPresent())
.isFalse();
}
@Test
void retrieveFailedEventShouldReturnStoredEvent() {
- cassandraEventDeadLettersDAO.store(GROUP_A, EVENT_1).block();
- cassandraEventDeadLettersDAO.store(GROUP_B, EVENT_2).block();
+ cassandraEventDeadLettersDAO.store(GROUP_A, EVENT_1, INSERTION_ID_1).block();
+ cassandraEventDeadLettersDAO.store(GROUP_B, EVENT_2, INSERTION_ID_2).block();
assertThat(cassandraEventDeadLettersDAO
- .retrieveFailedEvent(GROUP_B, EVENT_ID_2)
+ .retrieveFailedEvent(GROUP_B, INSERTION_ID_2)
.blockOptional().get())
.isEqualTo(EVENT_2);
}
@Test
- void retrieveEventIdsWithGroupShouldReturnEmptyWhenDefault() {
+ void retrieveInsertionIdsWithGroupShouldReturnEmptyWhenDefault() {
assertThat(cassandraEventDeadLettersDAO
- .retrieveEventIdsWithGroup(GROUP_A)
+ .retrieveInsertionIdsWithGroup(GROUP_A)
.collectList().block())
.isEmpty();
}
@Test
- void retrieveEventIdsWithGroupShouldReturnStoredEventId() {
- cassandraEventDeadLettersDAO.store(GROUP_B, EVENT_1).block();
- cassandraEventDeadLettersDAO.store(GROUP_B, EVENT_2).block();
- cassandraEventDeadLettersDAO.store(GROUP_B, EVENT_3).block();
+ void retrieveInsertionIdsWithGroupShouldReturnStoredInsertionId() {
+ cassandraEventDeadLettersDAO.store(GROUP_B, EVENT_1, INSERTION_ID_1).block();
+ cassandraEventDeadLettersDAO.store(GROUP_B, EVENT_2, INSERTION_ID_2).block();
+ cassandraEventDeadLettersDAO.store(GROUP_B, EVENT_3, INSERTION_ID_3).block();
assertThat(cassandraEventDeadLettersDAO
- .retrieveEventIdsWithGroup(GROUP_B)
+ .retrieveInsertionIdsWithGroup(GROUP_B)
.collectList().block())
- .containsOnly(EVENT_ID_1, EVENT_ID_2, EVENT_ID_3);
+ .containsOnly(INSERTION_ID_1, INSERTION_ID_2, INSERTION_ID_3);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org