You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by rc...@apache.org on 2022/08/23 08:46:10 UTC

[james-project] 01/03: JAMES-3802 EventDeadLetters:: an API to clean all events of a group

This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit e8b6b51152fca8de2d8f9e0f9b96933e42f72c1d
Author: Quan Tran <hq...@linagora.com>
AuthorDate: Thu Aug 11 13:19:23 2022 +0700

    JAMES-3802 EventDeadLetters:: an API to clean all events of a group
---
 .../org/apache/james/events/EventDeadLetters.java  |  3 +-
 .../james/events/EventDeadLettersContract.java     | 94 ++++++++++++++++++++++
 .../james/events/CassandraEventDeadLetters.java    |  9 ++-
 .../james/events/CassandraEventDeadLettersDAO.java | 13 +++
 .../events/CassandraEventDeadLettersGroupDAO.java  | 14 ++++
 .../events/CassandraEventDeadLettersDAOTest.java   | 14 ++++
 .../CassandraEventDeadLettersGroupDAOTest.java     | 11 +++
 .../james/events/MemoryEventDeadLetters.java       | 10 +++
 8 files changed, 166 insertions(+), 2 deletions(-)

diff --git a/event-bus/api/src/main/java/org/apache/james/events/EventDeadLetters.java b/event-bus/api/src/main/java/org/apache/james/events/EventDeadLetters.java
index 8987e18b17..f8cfcea739 100644
--- a/event-bus/api/src/main/java/org/apache/james/events/EventDeadLetters.java
+++ b/event-bus/api/src/main/java/org/apache/james/events/EventDeadLetters.java
@@ -81,7 +81,6 @@ public interface EventDeadLetters {
         }
     }
 
-
     String REGISTERED_GROUP_CANNOT_BE_NULL = "registeredGroup cannot be null";
     String FAIL_DELIVERED_EVENT_CANNOT_BE_NULL = "failDeliveredEvent cannot be null";
     String FAIL_DELIVERED_ID_INSERTION_CANNOT_BE_NULL = "failDeliveredInsertionId cannot be null";
@@ -90,6 +89,8 @@ public interface EventDeadLetters {
 
     Mono<Void> remove(Group registeredGroup, InsertionId failDeliveredInsertionId);
 
+    Mono<Void> remove(Group registeredGroup);
+
     Mono<Event> failedEvent(Group registeredGroup, InsertionId failDeliveredInsertionId);
 
     Flux<InsertionId> failedIds(Group registeredGroup);
diff --git a/event-bus/api/src/test/java/org/apache/james/events/EventDeadLettersContract.java b/event-bus/api/src/test/java/org/apache/james/events/EventDeadLettersContract.java
index 2b2270e96f..7ef08f4a9e 100644
--- a/event-bus/api/src/test/java/org/apache/james/events/EventDeadLettersContract.java
+++ b/event-bus/api/src/test/java/org/apache/james/events/EventDeadLettersContract.java
@@ -197,6 +197,14 @@ interface EventDeadLettersContract {
                 .isInstanceOf(IllegalArgumentException.class);
         }
 
+        @Test
+        default void removeAllEventsOfAGroupShouldThrowWhenGroupIsNull() {
+            EventDeadLetters eventDeadLetters = eventDeadLetters();
+
+            assertThatThrownBy(() -> eventDeadLetters.remove(NULL_GROUP))
+                .isInstanceOf(IllegalArgumentException.class);
+        }
+
         @Test
         default void removeShouldThrowWhenInsertionIdIsNull() {
             EventDeadLetters eventDeadLetters = eventDeadLetters();
@@ -226,6 +234,43 @@ interface EventDeadLettersContract {
                 .isNull();
         }
 
+        @Test
+        default void removeAllEventsOfAGroupShouldAllEventsOfThatGroup() {
+            EventDeadLetters eventDeadLetters = eventDeadLetters();
+
+            eventDeadLetters.store(GROUP_A, EVENT_1).block();
+            eventDeadLetters.store(GROUP_A, EVENT_2).block();
+
+            eventDeadLetters.remove(GROUP_A).block();
+
+            assertThat(eventDeadLetters.failedIds(GROUP_A).collectList().block())
+                .isEmpty();
+        }
+
+        @Test
+        default void removeAllEventsOfGroupAShouldRemoveThatGroup() {
+            EventDeadLetters eventDeadLetters = eventDeadLetters();
+            eventDeadLetters.store(GROUP_A, EVENT_1).block();
+            eventDeadLetters.store(GROUP_A, EVENT_2).block();
+
+            eventDeadLetters.remove(GROUP_A).block();
+
+            assertThat(eventDeadLetters.groupsWithFailedEvents().collectList().block())
+                .isEmpty();
+        }
+
+        @Test
+        default void removeAllEventsOfGroupAShouldNotRemoveGroupB() {
+            EventDeadLetters eventDeadLetters = eventDeadLetters();
+            eventDeadLetters.store(GROUP_A, EVENT_1).block();
+            eventDeadLetters.store(GROUP_B, EVENT_2).block();
+
+            eventDeadLetters.remove(GROUP_A).block();
+
+            assertThat(eventDeadLetters.groupsWithFailedEvents().collectList().block())
+                .containsOnly(GROUP_B);
+        }
+
         @Test
         default void removeShouldKeepNonMatched() {
             EventDeadLetters eventDeadLetters = eventDeadLetters();
@@ -240,6 +285,20 @@ interface EventDeadLettersContract {
                 .containsOnly(insertionId2, insertionId3);
         }
 
+        @Test
+        default void removeAllEventsOfAGroupShouldKeepNonMatched() {
+            EventDeadLetters eventDeadLetters = eventDeadLetters();
+
+            eventDeadLetters.store(GROUP_A, EVENT_1).block();
+            eventDeadLetters.store(GROUP_A, EVENT_2).block();
+            InsertionId insertionId3 = eventDeadLetters.store(GROUP_B, EVENT_3).block();
+
+            eventDeadLetters.remove(GROUP_A).block();
+
+            assertThat(eventDeadLetters.failedIds(GROUP_B).toStream())
+                .containsOnly(insertionId3);
+        }
+
         @Test
         default void removeShouldNotThrowWhenNoInsertionIdMatched() {
             EventDeadLetters eventDeadLetters = eventDeadLetters();
@@ -260,6 +319,16 @@ interface EventDeadLettersContract {
                 .doesNotThrowAnyException();
         }
 
+        @Test
+        default void removeAllEventsOfAGroupShouldNotThrowWhenNoGroupMatched() {
+            EventDeadLetters eventDeadLetters = eventDeadLetters();
+
+            eventDeadLetters.store(GROUP_A, EVENT_1).block();
+
+            assertThatCode(() -> eventDeadLetters.remove(GROUP_B).block())
+                .doesNotThrowAnyException();
+        }
+
         @Test
         default void removeShouldKeepConsistencyWhenConcurrentRemove() throws Exception {
             EventDeadLetters eventDeadLetters = eventDeadLetters();
@@ -291,6 +360,31 @@ interface EventDeadLettersContract {
             assertThat(allInsertionIds())
                 .isEmpty();
         }
+
+        @Test
+        default void removeAllEventsOfAGroupShouldKeepConsistencyWhenConcurrentRemove() throws Exception {
+            EventDeadLetters eventDeadLetters = eventDeadLetters();
+
+            ImmutableMap<Integer, Group> groups = concurrentGroups();
+
+            ConcurrentTestRunner.builder()
+                .operation((threadNumber, step) -> {
+                    Event.EventId eventId = Event.EventId.random();
+                    eventDeadLetters.store(groups.get(threadNumber), event(eventId)).block();
+                })
+                .threadCount(THREAD_COUNT)
+                .operationCount(OPERATION_COUNT)
+                .runSuccessfullyWithin(RUN_SUCCESSFULLY_IN);
+
+            ConcurrentTestRunner.builder()
+                .operation((threadNumber, step) -> eventDeadLetters.remove(groups.get(threadNumber)).block())
+                .threadCount(THREAD_COUNT)
+                .operationCount(OPERATION_COUNT)
+                .runSuccessfullyWithin(RUN_SUCCESSFULLY_IN);
+
+            assertThat(allInsertionIds())
+                .isEmpty();
+        }
     }
 
     interface FailedEventContract extends EventDeadLettersContract {
diff --git a/event-bus/cassandra/src/main/java/org/apache/james/events/CassandraEventDeadLetters.java b/event-bus/cassandra/src/main/java/org/apache/james/events/CassandraEventDeadLetters.java
index e170221fd4..a319acfa03 100644
--- a/event-bus/cassandra/src/main/java/org/apache/james/events/CassandraEventDeadLetters.java
+++ b/event-bus/cassandra/src/main/java/org/apache/james/events/CassandraEventDeadLetters.java
@@ -31,7 +31,6 @@ public class CassandraEventDeadLetters implements EventDeadLetters {
     private final CassandraEventDeadLettersDAO cassandraEventDeadLettersDAO;
     private final CassandraEventDeadLettersGroupDAO cassandraEventDeadLettersGroupDAO;
 
-
     @Inject
     CassandraEventDeadLetters(CassandraEventDeadLettersDAO cassandraEventDeadLettersDAO,
                               CassandraEventDeadLettersGroupDAO cassandraEventDeadLettersGroupDAO) {
@@ -58,6 +57,14 @@ public class CassandraEventDeadLetters implements EventDeadLetters {
         return cassandraEventDeadLettersDAO.removeEvent(registeredGroup, failDeliveredInsertionId);
     }
 
+    @Override
+    public Mono<Void> remove(Group registeredGroup) {
+        Preconditions.checkArgument(registeredGroup != null, REGISTERED_GROUP_CANNOT_BE_NULL);
+
+        return cassandraEventDeadLettersDAO.removeEvents(registeredGroup)
+            .then(cassandraEventDeadLettersGroupDAO.deleteGroup(registeredGroup));
+    }
+
     @Override
     public Mono<Event> failedEvent(Group registeredGroup, InsertionId failDeliveredInsertionId) {
         Preconditions.checkArgument(registeredGroup != null, REGISTERED_GROUP_CANNOT_BE_NULL);
diff --git a/event-bus/cassandra/src/main/java/org/apache/james/events/CassandraEventDeadLettersDAO.java b/event-bus/cassandra/src/main/java/org/apache/james/events/CassandraEventDeadLettersDAO.java
index 9af07010c2..a46938539c 100644
--- a/event-bus/cassandra/src/main/java/org/apache/james/events/CassandraEventDeadLettersDAO.java
+++ b/event-bus/cassandra/src/main/java/org/apache/james/events/CassandraEventDeadLettersDAO.java
@@ -43,6 +43,7 @@ public class CassandraEventDeadLettersDAO {
     private final EventSerializer eventSerializer;
     private final PreparedStatement insertStatement;
     private final PreparedStatement deleteStatement;
+    private final PreparedStatement deleteAllEventsOfAGroupStatement;
     private final PreparedStatement selectEventStatement;
     private final PreparedStatement selectEventIdsWithGroupStatement;
     private final PreparedStatement containEventsStatement;
@@ -53,6 +54,7 @@ public class CassandraEventDeadLettersDAO {
         this.eventSerializer = eventSerializer;
         this.insertStatement = prepareInsertStatement(session);
         this.deleteStatement = prepareDeleteStatement(session);
+        this.deleteAllEventsOfAGroupStatement = prepareDeleteAllEventsOfAGroupStatement(session);
         this.selectEventStatement = prepareSelectEventStatement(session);
         this.selectEventIdsWithGroupStatement = prepareSelectInsertionIdsWithGroupStatement(session);
         this.containEventsStatement = prepareContainEventStatement(session);
@@ -73,6 +75,12 @@ public class CassandraEventDeadLettersDAO {
             .build());
     }
 
+    private PreparedStatement prepareDeleteAllEventsOfAGroupStatement(CqlSession session) {
+        return session.prepare(deleteFrom(TABLE_NAME)
+            .whereColumn(GROUP).isEqualTo(bindMarker(GROUP))
+            .build());
+    }
+
     private PreparedStatement prepareSelectEventStatement(CqlSession session) {
         return session.prepare(selectFrom(TABLE_NAME)
             .column(EVENT)
@@ -108,6 +116,11 @@ public class CassandraEventDeadLettersDAO {
                 .setUuid(INSERTION_ID, failedInsertionId.getId()));
     }
 
+    Mono<Void> removeEvents(Group group) {
+        return executor.executeVoid(deleteAllEventsOfAGroupStatement.bind()
+            .setString(GROUP, group.asString()));
+    }
+
     Mono<Event> retrieveFailedEvent(Group group, EventDeadLetters.InsertionId insertionId) {
         return executor.executeSingleRow(selectEventStatement.bind()
                 .setString(GROUP, group.asString())
diff --git a/event-bus/cassandra/src/main/java/org/apache/james/events/CassandraEventDeadLettersGroupDAO.java b/event-bus/cassandra/src/main/java/org/apache/james/events/CassandraEventDeadLettersGroupDAO.java
index 80747f7dd0..2e2e067b9d 100644
--- a/event-bus/cassandra/src/main/java/org/apache/james/events/CassandraEventDeadLettersGroupDAO.java
+++ b/event-bus/cassandra/src/main/java/org/apache/james/events/CassandraEventDeadLettersGroupDAO.java
@@ -20,6 +20,7 @@
 package org.apache.james.events;
 
 import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.bindMarker;
+import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.deleteFrom;
 import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.insertInto;
 import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.selectFrom;
 import static org.apache.james.events.tables.CassandraEventDeadLettersGroupTable.GROUP;
@@ -40,12 +41,14 @@ public class CassandraEventDeadLettersGroupDAO {
     private final CassandraAsyncExecutor executor;
     private final PreparedStatement insertStatement;
     private final PreparedStatement selectAllStatement;
+    private final PreparedStatement deleteStatement;
 
     @Inject
     CassandraEventDeadLettersGroupDAO(CqlSession session) {
         this.executor = new CassandraAsyncExecutor(session);
         this.insertStatement = prepareInsertStatement(session);
         this.selectAllStatement = prepareSelectStatement(session);
+        this.deleteStatement = prepareDeleteStatement(session);
     }
 
     private PreparedStatement prepareInsertStatement(CqlSession session) {
@@ -60,6 +63,12 @@ public class CassandraEventDeadLettersGroupDAO {
             .build());
     }
 
+    private PreparedStatement prepareDeleteStatement(CqlSession session) {
+        return session.prepare(deleteFrom(TABLE_NAME)
+            .whereColumn(GROUP).isEqualTo(bindMarker(GROUP))
+            .build());
+    }
+
     Mono<Void> storeGroup(Group group) {
         return executor.executeVoid(insertStatement.bind()
                 .setString(GROUP, group.asString()));
@@ -69,4 +78,9 @@ public class CassandraEventDeadLettersGroupDAO {
         return executor.executeRows(selectAllStatement.bind())
             .map(Throwing.function(row -> Group.deserialize(row.getString(GROUP))));
     }
+
+    Mono<Void> deleteGroup(Group group) {
+        return executor.executeVoid(deleteStatement.bind()
+            .setString(GROUP, group.asString()));
+    }
 }
diff --git a/event-bus/cassandra/src/test/java/org/apache/james/events/CassandraEventDeadLettersDAOTest.java b/event-bus/cassandra/src/test/java/org/apache/james/events/CassandraEventDeadLettersDAOTest.java
index ffa7cd3783..9f9d205b29 100644
--- a/event-bus/cassandra/src/test/java/org/apache/james/events/CassandraEventDeadLettersDAOTest.java
+++ b/event-bus/cassandra/src/test/java/org/apache/james/events/CassandraEventDeadLettersDAOTest.java
@@ -59,6 +59,20 @@ class CassandraEventDeadLettersDAOTest {
             .isEmpty();
     }
 
+    @Test
+    void removeAllEventsOfAGroupShouldWork() {
+        cassandraEventDeadLettersDAO.store(GROUP_A, EVENT_1, INSERTION_ID_1).block();
+        cassandraEventDeadLettersDAO.store(GROUP_A, EVENT_1, INSERTION_ID_2).block();
+        cassandraEventDeadLettersDAO.store(GROUP_A, EVENT_1, INSERTION_ID_3).block();
+
+        cassandraEventDeadLettersDAO.removeEvents(GROUP_A).block();
+
+        assertThat(cassandraEventDeadLettersDAO
+            .retrieveInsertionIdsWithGroup(GROUP_A)
+            .collectList().block())
+            .isEmpty();
+    }
+
     @Test
     void retrieveFailedEventShouldReturnEmptyWhenDefault() {
         assertThat(cassandraEventDeadLettersDAO
diff --git a/event-bus/cassandra/src/test/java/org/apache/james/events/CassandraEventDeadLettersGroupDAOTest.java b/event-bus/cassandra/src/test/java/org/apache/james/events/CassandraEventDeadLettersGroupDAOTest.java
index eda7fdc206..3b8c3a6c26 100644
--- a/event-bus/cassandra/src/test/java/org/apache/james/events/CassandraEventDeadLettersGroupDAOTest.java
+++ b/event-bus/cassandra/src/test/java/org/apache/james/events/CassandraEventDeadLettersGroupDAOTest.java
@@ -57,4 +57,15 @@ class CassandraEventDeadLettersGroupDAOTest {
                 .collectList().block())
             .containsOnly(GROUP_A, GROUP_B);
     }
+
+    @Test
+    void deleteGroupShouldOnlyDeleteMatchedGroup() {
+        GROUP_DAO.storeGroup(GROUP_A).block();
+        GROUP_DAO.storeGroup(GROUP_B).block();
+
+        GROUP_DAO.deleteGroup(GROUP_A).block();
+
+        assertThat(GROUP_DAO.retrieveAllGroups().collectList().block())
+            .containsOnly(GROUP_B);
+    }
 }
diff --git a/event-bus/in-vm/src/main/java/org/apache/james/events/MemoryEventDeadLetters.java b/event-bus/in-vm/src/main/java/org/apache/james/events/MemoryEventDeadLetters.java
index b3df6dca3d..3a1c1da729 100644
--- a/event-bus/in-vm/src/main/java/org/apache/james/events/MemoryEventDeadLetters.java
+++ b/event-bus/in-vm/src/main/java/org/apache/james/events/MemoryEventDeadLetters.java
@@ -58,6 +58,16 @@ public class MemoryEventDeadLetters implements EventDeadLetters {
         }
     }
 
+    @Override
+    public Mono<Void> remove(Group registeredGroup) {
+        Preconditions.checkArgument(registeredGroup != null, REGISTERED_GROUP_CANNOT_BE_NULL);
+
+        synchronized (deadLetters) {
+            deadLetters.row(registeredGroup).clear();
+            return Mono.empty();
+        }
+    }
+
     @Override
     public Mono<Event> failedEvent(Group registeredGroup, InsertionId failDeliveredInsertionId) {
         Preconditions.checkArgument(registeredGroup != null, REGISTERED_GROUP_CANNOT_BE_NULL);


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org