You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2019/01/10 09:48:53 UTC

[1/2] james-project git commit: MAILBOX-373 EventDeadLetters API

Repository: james-project
Updated Branches:
  refs/heads/master 59c48ef84 -> 7e2a49a0d


MAILBOX-373 EventDeadLetters API


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/9eacc577
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/9eacc577
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/9eacc577

Branch: refs/heads/master
Commit: 9eacc577840c4c67a564ef032909c2ffff3a7b37
Parents: 59c48ef
Author: tran tien duc <dt...@linagora.com>
Authored: Tue Jan 8 17:29:53 2019 +0700
Committer: Benoit Tellier <bt...@linagora.com>
Committed: Thu Jan 10 16:47:53 2019 +0700

----------------------------------------------------------------------
 .../james/mailbox/events/EventDeadLetters.java  | 37 ++++++++++++++++++++
 1 file changed, 37 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/9eacc577/mailbox/api/src/main/java/org/apache/james/mailbox/events/EventDeadLetters.java
----------------------------------------------------------------------
diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/events/EventDeadLetters.java b/mailbox/api/src/main/java/org/apache/james/mailbox/events/EventDeadLetters.java
new file mode 100644
index 0000000..238533b
--- /dev/null
+++ b/mailbox/api/src/main/java/org/apache/james/mailbox/events/EventDeadLetters.java
@@ -0,0 +1,37 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.events;
+
+import org.apache.james.mailbox.Event;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+interface EventDeadLetters {
+    Mono<Void> store(Group registeredGroup, Event failDeliveredEvent);
+
+    Mono<Void> remove(Group registeredGroup, Event.EventId failDeliveredEventId);
+
+    Mono<Event> failedEvent(Group registeredGroup, Event.EventId failDeliveredEventId);
+
+    Flux<Event.EventId> failedEventIds(Group registeredGroup);
+
+    Flux<Group> groupsWithFailedEvents();
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[2/2] james-project git commit: MAILBOX-373 EventDeadLetters Contract & Memory impl

Posted by bt...@apache.org.
MAILBOX-373 EventDeadLetters Contract & Memory impl


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/7e2a49a0
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/7e2a49a0
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/7e2a49a0

Branch: refs/heads/master
Commit: 7e2a49a0de899a2c39d4e8b174b644bd0151f8fa
Parents: 9eacc57
Author: tran tien duc <dt...@linagora.com>
Authored: Tue Jan 8 17:30:36 2019 +0700
Committer: Benoit Tellier <bt...@linagora.com>
Committed: Thu Jan 10 16:47:54 2019 +0700

----------------------------------------------------------------------
 .../events/EventDeadLettersContract.java        | 402 +++++++++++++++++++
 .../mailbox/events/MemoryEventDeadLetters.java  |  90 +++++
 .../events/MemoryEventDeadLettersTest.java      |  44 ++
 3 files changed, 536 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/7e2a49a0/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventDeadLettersContract.java
----------------------------------------------------------------------
diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventDeadLettersContract.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventDeadLettersContract.java
new file mode 100644
index 0000000..d70395a
--- /dev/null
+++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventDeadLettersContract.java
@@ -0,0 +1,402 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.events;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+import org.apache.james.core.User;
+import org.apache.james.mailbox.Event;
+import org.apache.james.mailbox.MailboxListener;
+import org.apache.james.mailbox.MailboxSession;
+import org.apache.james.mailbox.model.MailboxConstants;
+import org.apache.james.mailbox.model.MailboxPath;
+import org.apache.james.mailbox.model.TestId;
+import org.apache.james.util.concurrency.ConcurrentTestRunner;
+import org.junit.jupiter.api.Test;
+
+import com.github.steveash.guavate.Guavate;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
+
+interface EventDeadLettersContract {
+
+    class Group0 extends Group{}
+    class Group1 extends Group{}
+    class Group2 extends Group{}
+    class Group3 extends Group{}
+    class Group4 extends Group{}
+    class Group5 extends Group{}
+    class Group6 extends Group{}
+    class Group7 extends Group{}
+    class Group8 extends Group{}
+    class Group9 extends Group{}
+
+    static ImmutableMap<Integer, Group> concurrentGroups() {
+        return IntStream.range(0, CONCURRENT_GROUPS.size()).boxed()
+            .collect(Guavate.toImmutableMap(Function.identity(), CONCURRENT_GROUPS::get));
+    }
+
+    static Event event(Event.EventId eventId) {
+        return new MailboxListener.MailboxAdded(SESSION_ID, USER, MAILBOX_PATH, MAILBOX_ID, eventId);
+    }
+
+    List<Group> CONCURRENT_GROUPS = ImmutableList.of(new Group0(), new Group1(), new Group2(), new Group3(), new Group4(), new Group5(),
+        new Group6(), new Group7(), new Group8(), new Group9());
+    Duration RUN_SUCCESSFULLY_IN = Duration.ofSeconds(5);
+    int THREAD_COUNT = 10;
+    int OPERATION_COUNT = 50;
+
+    MailboxPath MAILBOX_PATH = new MailboxPath(MailboxConstants.USER_NAMESPACE, "user", "mailboxName");
+    User USER = User.fromUsername("user");
+    MailboxSession.SessionId SESSION_ID = MailboxSession.SessionId.of(235);
+    TestId MAILBOX_ID = TestId.of(563);
+    Event.EventId EVENT_ID_1 = Event.EventId.of("6e0dd59d-660e-4d9b-b22f-0354479f47b4");
+    Event.EventId EVENT_ID_2 = Event.EventId.of("6e0dd59d-660e-4d9b-b22f-0354479f47b5");
+    Event.EventId EVENT_ID_3 = Event.EventId.of("6e0dd59d-660e-4d9b-b22f-0354479f47b6");
+    MailboxListener.MailboxAdded EVENT_1 = new MailboxListener.MailboxAdded(SESSION_ID, USER, MAILBOX_PATH, MAILBOX_ID, EVENT_ID_1);
+    MailboxListener.MailboxAdded EVENT_2 = new MailboxListener.MailboxAdded(SESSION_ID, USER, MAILBOX_PATH, MAILBOX_ID, EVENT_ID_2);
+    MailboxListener.MailboxAdded EVENT_3 = new MailboxListener.MailboxAdded(SESSION_ID, USER, MAILBOX_PATH, MAILBOX_ID, EVENT_ID_3);
+
+    Group GROUP_A = new EventBusContract.GroupA();
+    Group GROUP_B = new EventBusContract.GroupB();
+    Group NULL_GROUP = null;
+    Event NULL_EVENT = null;
+    Event.EventId NULL_EVENT_ID = null;
+
+    EventDeadLetters eventDeadLetters();
+
+    default Stream<Event.EventId> allEventIds() {
+        EventDeadLetters eventDeadLetters = eventDeadLetters();
+
+        return eventDeadLetters.groupsWithFailedEvents()
+            .flatMap(eventDeadLetters::failedEventIds)
+            .toStream();
+    }
+
+    interface StoreContract extends EventDeadLettersContract {
+
+        @Test
+        default void storeShouldThrowWhenNullGroup() {
+            EventDeadLetters eventDeadLetters = eventDeadLetters();
+
+            assertThatThrownBy(() -> eventDeadLetters.store(NULL_GROUP, EVENT_1))
+                .isInstanceOf(IllegalArgumentException.class);
+        }
+
+        @Test
+        default void storeShouldThrowWhenNullEvent() {
+            EventDeadLetters eventDeadLetters = eventDeadLetters();
+
+            assertThatThrownBy(() -> eventDeadLetters.store(GROUP_A, NULL_EVENT))
+                .isInstanceOf(IllegalArgumentException.class);
+        }
+
+        @Test
+        default void storeShouldThrowWhenBothGroupAndEventAreNull() {
+            EventDeadLetters eventDeadLetters = eventDeadLetters();
+
+            assertThatThrownBy(() -> eventDeadLetters.store(NULL_GROUP, NULL_EVENT))
+                .isInstanceOf(IllegalArgumentException.class);
+        }
+
+        @Test
+        default void storeShouldStoreGroupWithCorrespondingEvent() {
+            EventDeadLetters eventDeadLetters = eventDeadLetters();
+
+            eventDeadLetters.store(GROUP_A, EVENT_1).block();
+            assertThat(eventDeadLetters.failedEvent(GROUP_A, EVENT_1.getEventId()).block())
+                .isEqualTo(EVENT_1);
+        }
+
+        @Test
+        default void storeShouldIgnoreStoreDuplicatedEventsPerGroup() {
+            EventDeadLetters eventDeadLetters = eventDeadLetters();
+
+            eventDeadLetters.store(GROUP_A, EVENT_1).block();
+            eventDeadLetters.store(GROUP_A, EVENT_1).block();
+            eventDeadLetters.store(GROUP_A, EVENT_1).block();
+
+            assertThat(eventDeadLetters.failedEventIds(GROUP_A).toStream())
+                .containsExactly(EVENT_ID_1);
+        }
+
+        @Test
+        default void storeShouldKeepConsistencyWhenConcurrentStore() throws Exception {
+            EventDeadLetters eventDeadLetters = eventDeadLetters();
+
+            ImmutableMap<Integer, Group> groups = concurrentGroups();
+            Multimap<Integer, Event.EventId> storedEventIds = Multimaps.synchronizedSetMultimap(HashMultimap.create());
+
+            ConcurrentTestRunner.builder()
+                .operation((threadNumber, step) -> {
+                    Event.EventId eventId = Event.EventId.random();
+                    storedEventIds.put(threadNumber, eventId);
+                    eventDeadLetters.store(groups.get(threadNumber), event(eventId));
+                })
+                .threadCount(THREAD_COUNT)
+                .operationCount(OPERATION_COUNT)
+                .runSuccessfullyWithin(RUN_SUCCESSFULLY_IN);
+
+            groups.forEach((groupId, group) -> {
+                Group storedGroup = groups.get(groupId);
+                assertThat(eventDeadLetters.failedEventIds(storedGroup).toStream())
+                    .hasSameElementsAs(storedEventIds.get(groupId));
+            });
+        }
+    }
+
+    interface RemoveContract extends EventDeadLettersContract {
+
+        @Test
+        default void removeShouldThrowWhenGroupIsNull() {
+            EventDeadLetters eventDeadLetters = eventDeadLetters();
+
+            assertThatThrownBy(() -> eventDeadLetters.remove(NULL_GROUP, EVENT_ID_1))
+                .isInstanceOf(IllegalArgumentException.class);
+        }
+
+        @Test
+        default void removeShouldThrowWhenEventIdIsNull() {
+            EventDeadLetters eventDeadLetters = eventDeadLetters();
+
+            assertThatThrownBy(() -> eventDeadLetters.remove(GROUP_A, NULL_EVENT_ID))
+                .isInstanceOf(IllegalArgumentException.class);
+        }
+
+        @Test
+        default void removeShouldThrowWhenBothGroupAndEventIdAreNull() {
+            EventDeadLetters eventDeadLetters = eventDeadLetters();
+
+            assertThatThrownBy(() -> eventDeadLetters.remove(NULL_GROUP, NULL_EVENT_ID))
+                .isInstanceOf(IllegalArgumentException.class);
+        }
+
+        @Test
+        default void removeShouldRemoveMatched() {
+            EventDeadLetters eventDeadLetters = eventDeadLetters();
+
+            eventDeadLetters.store(GROUP_A, EVENT_1).block();
+            eventDeadLetters.store(GROUP_A, EVENT_2).block();
+
+            eventDeadLetters.remove(GROUP_A, EVENT_1.getEventId()).block();
+
+            assertThat(eventDeadLetters.failedEvent(GROUP_A, EVENT_1.getEventId()).block())
+                .isNull();
+        }
+
+        @Test
+        default void removeShouldKeepNonMatched() {
+            EventDeadLetters eventDeadLetters = eventDeadLetters();
+
+            eventDeadLetters.store(GROUP_A, EVENT_1).block();
+            eventDeadLetters.store(GROUP_A, EVENT_2).block();
+            eventDeadLetters.store(GROUP_A, EVENT_3).block();
+
+            eventDeadLetters.remove(GROUP_A, EVENT_1.getEventId()).block();
+
+            assertThat(eventDeadLetters.failedEventIds(GROUP_A).toStream())
+                .containsOnly(EVENT_ID_2, EVENT_ID_3);
+        }
+
+        @Test
+        default void removeShouldNotThrowWhenNoMatched() {
+            EventDeadLetters eventDeadLetters = eventDeadLetters();
+
+            eventDeadLetters.store(GROUP_A, EVENT_1).block();
+
+            assertThatCode(() -> eventDeadLetters.remove(GROUP_A, EVENT_2.getEventId()).block())
+                .doesNotThrowAnyException();
+        }
+
+        @Test
+        default void removeShouldKeepConsistencyWhenConcurrentRemove() throws Exception {
+            EventDeadLetters eventDeadLetters = eventDeadLetters();
+
+            ImmutableMap<Integer, Group> groups = concurrentGroups();
+            ConcurrentHashMap<Integer, Event.EventId> storedEventIds = new ConcurrentHashMap<>();
+
+            ConcurrentTestRunner.builder()
+                .operation((threadNumber, step) -> {
+                    int operationIndex = threadNumber * OPERATION_COUNT + step;
+                    Event.EventId eventId = Event.EventId.random();
+                    storedEventIds.put(operationIndex, eventId);
+                    eventDeadLetters.store(groups.get(threadNumber), event(eventId)).subscribe();
+                })
+                .threadCount(THREAD_COUNT)
+                .operationCount(OPERATION_COUNT)
+                .runSuccessfullyWithin(RUN_SUCCESSFULLY_IN);
+
+            ConcurrentTestRunner.builder()
+                .operation((threadNumber, step) -> {
+                    int operationIndex = threadNumber * OPERATION_COUNT + step;
+                    eventDeadLetters.remove(groups.get(threadNumber), storedEventIds.get(operationIndex));
+                })
+                .threadCount(THREAD_COUNT)
+                .operationCount(OPERATION_COUNT)
+                .runSuccessfullyWithin(RUN_SUCCESSFULLY_IN);
+
+            assertThat(allEventIds())
+                .isEmpty();
+        }
+    }
+
+    interface FailedEventContract extends EventDeadLettersContract {
+
+        @Test
+        default void failedEventShouldThrowWhenGroupIsNull() {
+            EventDeadLetters eventDeadLetters = eventDeadLetters();
+
+            assertThatThrownBy(() -> eventDeadLetters.failedEvent(NULL_GROUP, EVENT_ID_1))
+                .isInstanceOf(IllegalArgumentException.class);
+        }
+
+        @Test
+        default void failedEventShouldThrowWhenEventIdIsNull() {
+            EventDeadLetters eventDeadLetters = eventDeadLetters();
+
+            assertThatThrownBy(() -> eventDeadLetters.failedEvent(GROUP_A, NULL_EVENT_ID))
+                .isInstanceOf(IllegalArgumentException.class);
+        }
+
+        @Test
+        default void failedEventShouldThrowWhenBothGroupAndEventIdAreNull() {
+            EventDeadLetters eventDeadLetters = eventDeadLetters();
+
+            assertThatThrownBy(() -> eventDeadLetters.failedEvent(NULL_GROUP, NULL_EVENT_ID))
+                .isInstanceOf(IllegalArgumentException.class);
+        }
+
+        @Test
+        default void failedEventShouldReturnEmptyWhenNotFound() {
+            EventDeadLetters eventDeadLetters = eventDeadLetters();
+
+            eventDeadLetters.store(GROUP_A, EVENT_1).block();
+            eventDeadLetters.store(GROUP_A, EVENT_2).block();
+
+            assertThat(eventDeadLetters.failedEvent(GROUP_A, EVENT_ID_3).block())
+                .isNull();
+        }
+
+        @Test
+        default void failedEventShouldReturnEventWhenContains() {
+            EventDeadLetters eventDeadLetters = eventDeadLetters();
+
+            eventDeadLetters.store(GROUP_A, EVENT_1).block();
+            eventDeadLetters.store(GROUP_A, EVENT_2).block();
+
+            assertThat(eventDeadLetters.failedEvent(GROUP_A, EVENT_1.getEventId()).block())
+                .isEqualTo(EVENT_1);
+        }
+
+        @Test
+        default void failedEventShouldNotRemoveEvent() {
+            EventDeadLetters eventDeadLetters = eventDeadLetters();
+
+            eventDeadLetters.store(GROUP_A, EVENT_1).block();
+            eventDeadLetters.store(GROUP_A, EVENT_2).block();
+            eventDeadLetters.store(GROUP_A, EVENT_3).block();
+
+            eventDeadLetters.failedEvent(GROUP_A, EVENT_1.getEventId()).block();
+
+            assertThat(allEventIds())
+                .containsOnly(EVENT_ID_1, EVENT_ID_2, EVENT_ID_3);
+        }
+    }
+
+    interface FailedEventsContract extends EventDeadLettersContract {
+
+        @Test
+        default void failedEventsShouldThrowWhenGroupIsNull() {
+            EventDeadLetters eventDeadLetters = eventDeadLetters();
+
+            assertThatThrownBy(() -> eventDeadLetters.failedEventIds(NULL_GROUP))
+                .isInstanceOf(IllegalArgumentException.class);
+        }
+
+        @Test
+        default void failedEventsByGroupShouldReturnEmptyWhenNonMatch() {
+            EventDeadLetters eventDeadLetters = eventDeadLetters();
+
+            eventDeadLetters.store(GROUP_A, EVENT_1).block();
+            eventDeadLetters.store(GROUP_A, EVENT_2).block();
+            eventDeadLetters.store(GROUP_A, EVENT_3).block();
+
+            assertThat(eventDeadLetters.failedEventIds(GROUP_B).toStream())
+                .isEmpty();
+        }
+
+        @Test
+        default void failedEventsByGroupShouldReturnAllEventsCorrespondingToGivenGroup() {
+            EventDeadLetters eventDeadLetters = eventDeadLetters();
+
+            eventDeadLetters.store(GROUP_A, EVENT_1).block();
+            eventDeadLetters.store(GROUP_A, EVENT_2).block();
+            eventDeadLetters.store(GROUP_B, EVENT_3).block();
+
+            assertThat(eventDeadLetters.failedEventIds(GROUP_A).toStream())
+                .containsOnly(EVENT_ID_1, EVENT_ID_2);
+        }
+
+        @Test
+        default void failedEventsByGroupShouldNotRemoveEvents() {
+            EventDeadLetters eventDeadLetters = eventDeadLetters();
+
+            eventDeadLetters.store(GROUP_A, EVENT_1).block();
+            eventDeadLetters.store(GROUP_A, EVENT_2).block();
+            eventDeadLetters.store(GROUP_B, EVENT_3).block();
+
+            eventDeadLetters.failedEventIds(GROUP_A).toStream();
+
+            assertThat(allEventIds())
+                .containsOnly(EVENT_ID_1, EVENT_ID_2, EVENT_ID_3);
+        }
+    }
+
+    interface GroupsWithFailedEventsContract extends EventDeadLettersContract {
+        @Test
+        default void groupsWithFailedEventsShouldReturnAllStoredGroups() {
+            EventDeadLetters eventDeadLetters = eventDeadLetters();
+            eventDeadLetters.store(GROUP_A, EVENT_1).block();
+            eventDeadLetters.store(GROUP_B, EVENT_1).block();
+
+            assertThat(eventDeadLetters.groupsWithFailedEvents().toStream())
+                .containsOnly(GROUP_A, GROUP_B);
+        }
+
+        @Test
+        default void groupsWithFailedEventsShouldReturnEmptyWhenNoStored() {
+            EventDeadLetters eventDeadLetters = eventDeadLetters();
+
+            assertThat(eventDeadLetters.groupsWithFailedEvents().toStream()).isEmpty();
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/james-project/blob/7e2a49a0/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/MemoryEventDeadLetters.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/MemoryEventDeadLetters.java b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/MemoryEventDeadLetters.java
new file mode 100644
index 0000000..8464ed9
--- /dev/null
+++ b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/MemoryEventDeadLetters.java
@@ -0,0 +1,90 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.events;
+
+import org.apache.james.mailbox.Event;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.publisher.MonoProcessor;
+
+public class MemoryEventDeadLetters implements EventDeadLetters {
+
+    private static final String REGISTERED_GROUP_CANNOT_BE_NULL = "registeredGroup cannot be null";
+    private static final String FAIL_DELIVERED_EVENT_CANNOT_BE_NULL = "failDeliveredEvent cannot be null";
+    private static final String FAIL_DELIVERED_ID_EVENT_CANNOT_BE_NULL = "failDeliveredEventId cannot be null";
+
+    private final Multimap<Group, Event> deadLetters;
+
+    MemoryEventDeadLetters() {
+        this.deadLetters = Multimaps.synchronizedSetMultimap(HashMultimap.create());
+    }
+
+    @Override
+    public Mono<Void> store(Group registeredGroup, Event failDeliveredEvent) {
+        Preconditions.checkArgument(registeredGroup != null, REGISTERED_GROUP_CANNOT_BE_NULL);
+        Preconditions.checkArgument(failDeliveredEvent != null, FAIL_DELIVERED_EVENT_CANNOT_BE_NULL);
+
+        return Mono.fromRunnable(() -> deadLetters.put(registeredGroup, failDeliveredEvent))
+            .subscribeWith(MonoProcessor.create())
+            .then();
+    }
+
+    @Override
+    public Mono<Void> remove(Group registeredGroup, Event.EventId failDeliveredEventId) {
+        Preconditions.checkArgument(registeredGroup != null, REGISTERED_GROUP_CANNOT_BE_NULL);
+        Preconditions.checkArgument(failDeliveredEventId != null, FAIL_DELIVERED_ID_EVENT_CANNOT_BE_NULL);
+
+        return Flux.fromIterable(deadLetters.get(registeredGroup))
+            .filter(event -> event.getEventId().equals(failDeliveredEventId))
+            .next()
+            .doOnNext(event -> deadLetters.remove(registeredGroup, event))
+            .subscribeWith(MonoProcessor.create())
+            .then();
+    }
+
+    @Override
+    public Mono<Event> failedEvent(Group registeredGroup, Event.EventId failDeliveredEventId) {
+        Preconditions.checkArgument(registeredGroup != null, REGISTERED_GROUP_CANNOT_BE_NULL);
+        Preconditions.checkArgument(failDeliveredEventId != null, FAIL_DELIVERED_ID_EVENT_CANNOT_BE_NULL);
+
+        return Flux.fromIterable(deadLetters.get(registeredGroup))
+            .filter(event -> event.getEventId().equals(failDeliveredEventId))
+            .next();
+    }
+
+    @Override
+    public Flux<Event.EventId> failedEventIds(Group registeredGroup) {
+        Preconditions.checkArgument(registeredGroup != null, REGISTERED_GROUP_CANNOT_BE_NULL);
+
+        return Flux.fromIterable(deadLetters.get(registeredGroup))
+            .map(Event::getEventId);
+    }
+
+    @Override
+    public Flux<Group> groupsWithFailedEvents() {
+        return Flux.fromIterable(deadLetters.keySet());
+    }
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/7e2a49a0/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/MemoryEventDeadLettersTest.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/MemoryEventDeadLettersTest.java b/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/MemoryEventDeadLettersTest.java
new file mode 100644
index 0000000..ab2c8a4
--- /dev/null
+++ b/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/MemoryEventDeadLettersTest.java
@@ -0,0 +1,44 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.events;
+
+import static org.apache.james.mailbox.events.EventDeadLettersContract.FailedEventContract;
+import static org.apache.james.mailbox.events.EventDeadLettersContract.FailedEventsContract;
+import static org.apache.james.mailbox.events.EventDeadLettersContract.GroupsWithFailedEventsContract;
+import static org.apache.james.mailbox.events.EventDeadLettersContract.RemoveContract;
+import static org.apache.james.mailbox.events.EventDeadLettersContract.StoreContract;
+
+import org.junit.jupiter.api.BeforeEach;
+
+class MemoryEventDeadLettersTest implements StoreContract, RemoveContract, FailedEventContract, FailedEventsContract,
+    GroupsWithFailedEventsContract {
+
+    private MemoryEventDeadLetters eventDeadLetters;
+
+    @BeforeEach
+    void setUp() {
+        eventDeadLetters = new MemoryEventDeadLetters();
+    }
+
+    @Override
+    public EventDeadLetters eventDeadLetters() {
+        return eventDeadLetters;
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org