You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by rc...@apache.org on 2022/08/23 08:46:10 UTC
[james-project] 01/03: JAMES-3802 EventDeadLetters:: an API to clean all events of a group
This is an automated email from the ASF dual-hosted git repository.
rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit e8b6b51152fca8de2d8f9e0f9b96933e42f72c1d
Author: Quan Tran <hq...@linagora.com>
AuthorDate: Thu Aug 11 13:19:23 2022 +0700
JAMES-3802 EventDeadLetters:: an API to clean all events of a group
---
.../org/apache/james/events/EventDeadLetters.java | 3 +-
.../james/events/EventDeadLettersContract.java | 94 ++++++++++++++++++++++
.../james/events/CassandraEventDeadLetters.java | 9 ++-
.../james/events/CassandraEventDeadLettersDAO.java | 13 +++
.../events/CassandraEventDeadLettersGroupDAO.java | 14 ++++
.../events/CassandraEventDeadLettersDAOTest.java | 14 ++++
.../CassandraEventDeadLettersGroupDAOTest.java | 11 +++
.../james/events/MemoryEventDeadLetters.java | 10 +++
8 files changed, 166 insertions(+), 2 deletions(-)
diff --git a/event-bus/api/src/main/java/org/apache/james/events/EventDeadLetters.java b/event-bus/api/src/main/java/org/apache/james/events/EventDeadLetters.java
index 8987e18b17..f8cfcea739 100644
--- a/event-bus/api/src/main/java/org/apache/james/events/EventDeadLetters.java
+++ b/event-bus/api/src/main/java/org/apache/james/events/EventDeadLetters.java
@@ -81,7 +81,6 @@ public interface EventDeadLetters {
}
}
-
String REGISTERED_GROUP_CANNOT_BE_NULL = "registeredGroup cannot be null";
String FAIL_DELIVERED_EVENT_CANNOT_BE_NULL = "failDeliveredEvent cannot be null";
String FAIL_DELIVERED_ID_INSERTION_CANNOT_BE_NULL = "failDeliveredInsertionId cannot be null";
@@ -90,6 +89,8 @@ public interface EventDeadLetters {
Mono<Void> remove(Group registeredGroup, InsertionId failDeliveredInsertionId);
+ Mono<Void> remove(Group registeredGroup);
+
Mono<Event> failedEvent(Group registeredGroup, InsertionId failDeliveredInsertionId);
Flux<InsertionId> failedIds(Group registeredGroup);
diff --git a/event-bus/api/src/test/java/org/apache/james/events/EventDeadLettersContract.java b/event-bus/api/src/test/java/org/apache/james/events/EventDeadLettersContract.java
index 2b2270e96f..7ef08f4a9e 100644
--- a/event-bus/api/src/test/java/org/apache/james/events/EventDeadLettersContract.java
+++ b/event-bus/api/src/test/java/org/apache/james/events/EventDeadLettersContract.java
@@ -197,6 +197,14 @@ interface EventDeadLettersContract {
.isInstanceOf(IllegalArgumentException.class);
}
+ @Test
+ default void removeAllEventsOfAGroupShouldThrowWhenGroupIsNull() {
+ EventDeadLetters eventDeadLetters = eventDeadLetters();
+
+ assertThatThrownBy(() -> eventDeadLetters.remove(NULL_GROUP))
+ .isInstanceOf(IllegalArgumentException.class);
+ }
+
@Test
default void removeShouldThrowWhenInsertionIdIsNull() {
EventDeadLetters eventDeadLetters = eventDeadLetters();
@@ -226,6 +234,43 @@ interface EventDeadLettersContract {
.isNull();
}
+ @Test
+ default void removeAllEventsOfAGroupShouldAllEventsOfThatGroup() {
+ EventDeadLetters eventDeadLetters = eventDeadLetters();
+
+ eventDeadLetters.store(GROUP_A, EVENT_1).block();
+ eventDeadLetters.store(GROUP_A, EVENT_2).block();
+
+ eventDeadLetters.remove(GROUP_A).block();
+
+ assertThat(eventDeadLetters.failedIds(GROUP_A).collectList().block())
+ .isEmpty();
+ }
+
+ @Test
+ default void removeAllEventsOfGroupAShouldRemoveThatGroup() {
+ EventDeadLetters eventDeadLetters = eventDeadLetters();
+ eventDeadLetters.store(GROUP_A, EVENT_1).block();
+ eventDeadLetters.store(GROUP_A, EVENT_2).block();
+
+ eventDeadLetters.remove(GROUP_A).block();
+
+ assertThat(eventDeadLetters.groupsWithFailedEvents().collectList().block())
+ .isEmpty();
+ }
+
+ @Test
+ default void removeAllEventsOfGroupAShouldNotRemoveGroupB() {
+ EventDeadLetters eventDeadLetters = eventDeadLetters();
+ eventDeadLetters.store(GROUP_A, EVENT_1).block();
+ eventDeadLetters.store(GROUP_B, EVENT_2).block();
+
+ eventDeadLetters.remove(GROUP_A).block();
+
+ assertThat(eventDeadLetters.groupsWithFailedEvents().collectList().block())
+ .containsOnly(GROUP_B);
+ }
+
@Test
default void removeShouldKeepNonMatched() {
EventDeadLetters eventDeadLetters = eventDeadLetters();
@@ -240,6 +285,20 @@ interface EventDeadLettersContract {
.containsOnly(insertionId2, insertionId3);
}
+ @Test
+ default void removeAllEventsOfAGroupShouldKeepNonMatched() {
+ EventDeadLetters eventDeadLetters = eventDeadLetters();
+
+ eventDeadLetters.store(GROUP_A, EVENT_1).block();
+ eventDeadLetters.store(GROUP_A, EVENT_2).block();
+ InsertionId insertionId3 = eventDeadLetters.store(GROUP_B, EVENT_3).block();
+
+ eventDeadLetters.remove(GROUP_A).block();
+
+ assertThat(eventDeadLetters.failedIds(GROUP_B).toStream())
+ .containsOnly(insertionId3);
+ }
+
@Test
default void removeShouldNotThrowWhenNoInsertionIdMatched() {
EventDeadLetters eventDeadLetters = eventDeadLetters();
@@ -260,6 +319,16 @@ interface EventDeadLettersContract {
.doesNotThrowAnyException();
}
+ @Test
+ default void removeAllEventsOfAGroupShouldNotThrowWhenNoGroupMatched() {
+ EventDeadLetters eventDeadLetters = eventDeadLetters();
+
+ eventDeadLetters.store(GROUP_A, EVENT_1).block();
+
+ assertThatCode(() -> eventDeadLetters.remove(GROUP_B).block())
+ .doesNotThrowAnyException();
+ }
+
@Test
default void removeShouldKeepConsistencyWhenConcurrentRemove() throws Exception {
EventDeadLetters eventDeadLetters = eventDeadLetters();
@@ -291,6 +360,31 @@ interface EventDeadLettersContract {
assertThat(allInsertionIds())
.isEmpty();
}
+
+ @Test
+ default void removeAllEventsOfAGroupShouldKeepConsistencyWhenConcurrentRemove() throws Exception {
+ EventDeadLetters eventDeadLetters = eventDeadLetters();
+
+ ImmutableMap<Integer, Group> groups = concurrentGroups();
+
+ ConcurrentTestRunner.builder()
+ .operation((threadNumber, step) -> {
+ Event.EventId eventId = Event.EventId.random();
+ eventDeadLetters.store(groups.get(threadNumber), event(eventId)).block();
+ })
+ .threadCount(THREAD_COUNT)
+ .operationCount(OPERATION_COUNT)
+ .runSuccessfullyWithin(RUN_SUCCESSFULLY_IN);
+
+ ConcurrentTestRunner.builder()
+ .operation((threadNumber, step) -> eventDeadLetters.remove(groups.get(threadNumber)).block())
+ .threadCount(THREAD_COUNT)
+ .operationCount(OPERATION_COUNT)
+ .runSuccessfullyWithin(RUN_SUCCESSFULLY_IN);
+
+ assertThat(allInsertionIds())
+ .isEmpty();
+ }
}
interface FailedEventContract extends EventDeadLettersContract {
diff --git a/event-bus/cassandra/src/main/java/org/apache/james/events/CassandraEventDeadLetters.java b/event-bus/cassandra/src/main/java/org/apache/james/events/CassandraEventDeadLetters.java
index e170221fd4..a319acfa03 100644
--- a/event-bus/cassandra/src/main/java/org/apache/james/events/CassandraEventDeadLetters.java
+++ b/event-bus/cassandra/src/main/java/org/apache/james/events/CassandraEventDeadLetters.java
@@ -31,7 +31,6 @@ public class CassandraEventDeadLetters implements EventDeadLetters {
private final CassandraEventDeadLettersDAO cassandraEventDeadLettersDAO;
private final CassandraEventDeadLettersGroupDAO cassandraEventDeadLettersGroupDAO;
-
@Inject
CassandraEventDeadLetters(CassandraEventDeadLettersDAO cassandraEventDeadLettersDAO,
CassandraEventDeadLettersGroupDAO cassandraEventDeadLettersGroupDAO) {
@@ -58,6 +57,14 @@ public class CassandraEventDeadLetters implements EventDeadLetters {
return cassandraEventDeadLettersDAO.removeEvent(registeredGroup, failDeliveredInsertionId);
}
+ @Override
+ public Mono<Void> remove(Group registeredGroup) {
+ Preconditions.checkArgument(registeredGroup != null, REGISTERED_GROUP_CANNOT_BE_NULL);
+
+ return cassandraEventDeadLettersDAO.removeEvents(registeredGroup)
+ .then(cassandraEventDeadLettersGroupDAO.deleteGroup(registeredGroup));
+ }
+
@Override
public Mono<Event> failedEvent(Group registeredGroup, InsertionId failDeliveredInsertionId) {
Preconditions.checkArgument(registeredGroup != null, REGISTERED_GROUP_CANNOT_BE_NULL);
diff --git a/event-bus/cassandra/src/main/java/org/apache/james/events/CassandraEventDeadLettersDAO.java b/event-bus/cassandra/src/main/java/org/apache/james/events/CassandraEventDeadLettersDAO.java
index 9af07010c2..a46938539c 100644
--- a/event-bus/cassandra/src/main/java/org/apache/james/events/CassandraEventDeadLettersDAO.java
+++ b/event-bus/cassandra/src/main/java/org/apache/james/events/CassandraEventDeadLettersDAO.java
@@ -43,6 +43,7 @@ public class CassandraEventDeadLettersDAO {
private final EventSerializer eventSerializer;
private final PreparedStatement insertStatement;
private final PreparedStatement deleteStatement;
+ private final PreparedStatement deleteAllEventsOfAGroupStatement;
private final PreparedStatement selectEventStatement;
private final PreparedStatement selectEventIdsWithGroupStatement;
private final PreparedStatement containEventsStatement;
@@ -53,6 +54,7 @@ public class CassandraEventDeadLettersDAO {
this.eventSerializer = eventSerializer;
this.insertStatement = prepareInsertStatement(session);
this.deleteStatement = prepareDeleteStatement(session);
+ this.deleteAllEventsOfAGroupStatement = prepareDeleteAllEventsOfAGroupStatement(session);
this.selectEventStatement = prepareSelectEventStatement(session);
this.selectEventIdsWithGroupStatement = prepareSelectInsertionIdsWithGroupStatement(session);
this.containEventsStatement = prepareContainEventStatement(session);
@@ -73,6 +75,12 @@ public class CassandraEventDeadLettersDAO {
.build());
}
+ private PreparedStatement prepareDeleteAllEventsOfAGroupStatement(CqlSession session) {
+ return session.prepare(deleteFrom(TABLE_NAME)
+ .whereColumn(GROUP).isEqualTo(bindMarker(GROUP))
+ .build());
+ }
+
private PreparedStatement prepareSelectEventStatement(CqlSession session) {
return session.prepare(selectFrom(TABLE_NAME)
.column(EVENT)
@@ -108,6 +116,11 @@ public class CassandraEventDeadLettersDAO {
.setUuid(INSERTION_ID, failedInsertionId.getId()));
}
+ Mono<Void> removeEvents(Group group) {
+ return executor.executeVoid(deleteAllEventsOfAGroupStatement.bind()
+ .setString(GROUP, group.asString()));
+ }
+
Mono<Event> retrieveFailedEvent(Group group, EventDeadLetters.InsertionId insertionId) {
return executor.executeSingleRow(selectEventStatement.bind()
.setString(GROUP, group.asString())
diff --git a/event-bus/cassandra/src/main/java/org/apache/james/events/CassandraEventDeadLettersGroupDAO.java b/event-bus/cassandra/src/main/java/org/apache/james/events/CassandraEventDeadLettersGroupDAO.java
index 80747f7dd0..2e2e067b9d 100644
--- a/event-bus/cassandra/src/main/java/org/apache/james/events/CassandraEventDeadLettersGroupDAO.java
+++ b/event-bus/cassandra/src/main/java/org/apache/james/events/CassandraEventDeadLettersGroupDAO.java
@@ -20,6 +20,7 @@
package org.apache.james.events;
import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.bindMarker;
+import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.deleteFrom;
import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.insertInto;
import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.selectFrom;
import static org.apache.james.events.tables.CassandraEventDeadLettersGroupTable.GROUP;
@@ -40,12 +41,14 @@ public class CassandraEventDeadLettersGroupDAO {
private final CassandraAsyncExecutor executor;
private final PreparedStatement insertStatement;
private final PreparedStatement selectAllStatement;
+ private final PreparedStatement deleteStatement;
@Inject
CassandraEventDeadLettersGroupDAO(CqlSession session) {
this.executor = new CassandraAsyncExecutor(session);
this.insertStatement = prepareInsertStatement(session);
this.selectAllStatement = prepareSelectStatement(session);
+ this.deleteStatement = prepareDeleteStatement(session);
}
private PreparedStatement prepareInsertStatement(CqlSession session) {
@@ -60,6 +63,12 @@ public class CassandraEventDeadLettersGroupDAO {
.build());
}
+ private PreparedStatement prepareDeleteStatement(CqlSession session) {
+ return session.prepare(deleteFrom(TABLE_NAME)
+ .whereColumn(GROUP).isEqualTo(bindMarker(GROUP))
+ .build());
+ }
+
Mono<Void> storeGroup(Group group) {
return executor.executeVoid(insertStatement.bind()
.setString(GROUP, group.asString()));
@@ -69,4 +78,9 @@ public class CassandraEventDeadLettersGroupDAO {
return executor.executeRows(selectAllStatement.bind())
.map(Throwing.function(row -> Group.deserialize(row.getString(GROUP))));
}
+
+ Mono<Void> deleteGroup(Group group) {
+ return executor.executeVoid(deleteStatement.bind()
+ .setString(GROUP, group.asString()));
+ }
}
diff --git a/event-bus/cassandra/src/test/java/org/apache/james/events/CassandraEventDeadLettersDAOTest.java b/event-bus/cassandra/src/test/java/org/apache/james/events/CassandraEventDeadLettersDAOTest.java
index ffa7cd3783..9f9d205b29 100644
--- a/event-bus/cassandra/src/test/java/org/apache/james/events/CassandraEventDeadLettersDAOTest.java
+++ b/event-bus/cassandra/src/test/java/org/apache/james/events/CassandraEventDeadLettersDAOTest.java
@@ -59,6 +59,20 @@ class CassandraEventDeadLettersDAOTest {
.isEmpty();
}
+ @Test
+ void removeAllEventsOfAGroupShouldWork() {
+ cassandraEventDeadLettersDAO.store(GROUP_A, EVENT_1, INSERTION_ID_1).block();
+ cassandraEventDeadLettersDAO.store(GROUP_A, EVENT_1, INSERTION_ID_2).block();
+ cassandraEventDeadLettersDAO.store(GROUP_A, EVENT_1, INSERTION_ID_3).block();
+
+ cassandraEventDeadLettersDAO.removeEvents(GROUP_A).block();
+
+ assertThat(cassandraEventDeadLettersDAO
+ .retrieveInsertionIdsWithGroup(GROUP_A)
+ .collectList().block())
+ .isEmpty();
+ }
+
@Test
void retrieveFailedEventShouldReturnEmptyWhenDefault() {
assertThat(cassandraEventDeadLettersDAO
diff --git a/event-bus/cassandra/src/test/java/org/apache/james/events/CassandraEventDeadLettersGroupDAOTest.java b/event-bus/cassandra/src/test/java/org/apache/james/events/CassandraEventDeadLettersGroupDAOTest.java
index eda7fdc206..3b8c3a6c26 100644
--- a/event-bus/cassandra/src/test/java/org/apache/james/events/CassandraEventDeadLettersGroupDAOTest.java
+++ b/event-bus/cassandra/src/test/java/org/apache/james/events/CassandraEventDeadLettersGroupDAOTest.java
@@ -57,4 +57,15 @@ class CassandraEventDeadLettersGroupDAOTest {
.collectList().block())
.containsOnly(GROUP_A, GROUP_B);
}
+
+ @Test
+ void deleteGroupShouldOnlyDeleteMatchedGroup() {
+ GROUP_DAO.storeGroup(GROUP_A).block();
+ GROUP_DAO.storeGroup(GROUP_B).block();
+
+ GROUP_DAO.deleteGroup(GROUP_A).block();
+
+ assertThat(GROUP_DAO.retrieveAllGroups().collectList().block())
+ .containsOnly(GROUP_B);
+ }
}
diff --git a/event-bus/in-vm/src/main/java/org/apache/james/events/MemoryEventDeadLetters.java b/event-bus/in-vm/src/main/java/org/apache/james/events/MemoryEventDeadLetters.java
index b3df6dca3d..3a1c1da729 100644
--- a/event-bus/in-vm/src/main/java/org/apache/james/events/MemoryEventDeadLetters.java
+++ b/event-bus/in-vm/src/main/java/org/apache/james/events/MemoryEventDeadLetters.java
@@ -58,6 +58,16 @@ public class MemoryEventDeadLetters implements EventDeadLetters {
}
}
+ @Override
+ public Mono<Void> remove(Group registeredGroup) {
+ Preconditions.checkArgument(registeredGroup != null, REGISTERED_GROUP_CANNOT_BE_NULL);
+
+ synchronized (deadLetters) {
+ deadLetters.row(registeredGroup).clear();
+ return Mono.empty();
+ }
+ }
+
@Override
public Mono<Event> failedEvent(Group registeredGroup, InsertionId failDeliveredInsertionId) {
Preconditions.checkArgument(registeredGroup != null, REGISTERED_GROUP_CANNOT_BE_NULL);
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org