You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2019/01/10 09:48:53 UTC
[1/2] james-project git commit: MAILBOX-373 EventDeadLetters API
Repository: james-project
Updated Branches:
refs/heads/master 59c48ef84 -> 7e2a49a0d
MAILBOX-373 EventDeadLetters API
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/9eacc577
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/9eacc577
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/9eacc577
Branch: refs/heads/master
Commit: 9eacc577840c4c67a564ef032909c2ffff3a7b37
Parents: 59c48ef
Author: tran tien duc <dt...@linagora.com>
Authored: Tue Jan 8 17:29:53 2019 +0700
Committer: Benoit Tellier <bt...@linagora.com>
Committed: Thu Jan 10 16:47:53 2019 +0700
----------------------------------------------------------------------
.../james/mailbox/events/EventDeadLetters.java | 37 ++++++++++++++++++++
1 file changed, 37 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/james-project/blob/9eacc577/mailbox/api/src/main/java/org/apache/james/mailbox/events/EventDeadLetters.java
----------------------------------------------------------------------
diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/events/EventDeadLetters.java b/mailbox/api/src/main/java/org/apache/james/mailbox/events/EventDeadLetters.java
new file mode 100644
index 0000000..238533b
--- /dev/null
+++ b/mailbox/api/src/main/java/org/apache/james/mailbox/events/EventDeadLetters.java
@@ -0,0 +1,37 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.mailbox.events;
+
+import org.apache.james.mailbox.Event;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+interface EventDeadLetters {
+ Mono<Void> store(Group registeredGroup, Event failDeliveredEvent);
+
+ Mono<Void> remove(Group registeredGroup, Event.EventId failDeliveredEventId);
+
+ Mono<Event> failedEvent(Group registeredGroup, Event.EventId failDeliveredEventId);
+
+ Flux<Event.EventId> failedEventIds(Group registeredGroup);
+
+ Flux<Group> groupsWithFailedEvents();
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org
[2/2] james-project git commit: MAILBOX-373 EventDeadLetters Contract
& Memory impl
Posted by bt...@apache.org.
MAILBOX-373 EventDeadLetters Contract & Memory impl
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/7e2a49a0
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/7e2a49a0
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/7e2a49a0
Branch: refs/heads/master
Commit: 7e2a49a0de899a2c39d4e8b174b644bd0151f8fa
Parents: 9eacc57
Author: tran tien duc <dt...@linagora.com>
Authored: Tue Jan 8 17:30:36 2019 +0700
Committer: Benoit Tellier <bt...@linagora.com>
Committed: Thu Jan 10 16:47:54 2019 +0700
----------------------------------------------------------------------
.../events/EventDeadLettersContract.java | 402 +++++++++++++++++++
.../mailbox/events/MemoryEventDeadLetters.java | 90 +++++
.../events/MemoryEventDeadLettersTest.java | 44 ++
3 files changed, 536 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/james-project/blob/7e2a49a0/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventDeadLettersContract.java
----------------------------------------------------------------------
diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventDeadLettersContract.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventDeadLettersContract.java
new file mode 100644
index 0000000..d70395a
--- /dev/null
+++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventDeadLettersContract.java
@@ -0,0 +1,402 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.mailbox.events;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+import org.apache.james.core.User;
+import org.apache.james.mailbox.Event;
+import org.apache.james.mailbox.MailboxListener;
+import org.apache.james.mailbox.MailboxSession;
+import org.apache.james.mailbox.model.MailboxConstants;
+import org.apache.james.mailbox.model.MailboxPath;
+import org.apache.james.mailbox.model.TestId;
+import org.apache.james.util.concurrency.ConcurrentTestRunner;
+import org.junit.jupiter.api.Test;
+
+import com.github.steveash.guavate.Guavate;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
+
+interface EventDeadLettersContract {
+
+ class Group0 extends Group{}
+ class Group1 extends Group{}
+ class Group2 extends Group{}
+ class Group3 extends Group{}
+ class Group4 extends Group{}
+ class Group5 extends Group{}
+ class Group6 extends Group{}
+ class Group7 extends Group{}
+ class Group8 extends Group{}
+ class Group9 extends Group{}
+
+ static ImmutableMap<Integer, Group> concurrentGroups() {
+ return IntStream.range(0, CONCURRENT_GROUPS.size()).boxed()
+ .collect(Guavate.toImmutableMap(Function.identity(), CONCURRENT_GROUPS::get));
+ }
+
+ static Event event(Event.EventId eventId) {
+ return new MailboxListener.MailboxAdded(SESSION_ID, USER, MAILBOX_PATH, MAILBOX_ID, eventId);
+ }
+
+ List<Group> CONCURRENT_GROUPS = ImmutableList.of(new Group0(), new Group1(), new Group2(), new Group3(), new Group4(), new Group5(),
+ new Group6(), new Group7(), new Group8(), new Group9());
+ Duration RUN_SUCCESSFULLY_IN = Duration.ofSeconds(5);
+ int THREAD_COUNT = 10;
+ int OPERATION_COUNT = 50;
+
+ MailboxPath MAILBOX_PATH = new MailboxPath(MailboxConstants.USER_NAMESPACE, "user", "mailboxName");
+ User USER = User.fromUsername("user");
+ MailboxSession.SessionId SESSION_ID = MailboxSession.SessionId.of(235);
+ TestId MAILBOX_ID = TestId.of(563);
+ Event.EventId EVENT_ID_1 = Event.EventId.of("6e0dd59d-660e-4d9b-b22f-0354479f47b4");
+ Event.EventId EVENT_ID_2 = Event.EventId.of("6e0dd59d-660e-4d9b-b22f-0354479f47b5");
+ Event.EventId EVENT_ID_3 = Event.EventId.of("6e0dd59d-660e-4d9b-b22f-0354479f47b6");
+ MailboxListener.MailboxAdded EVENT_1 = new MailboxListener.MailboxAdded(SESSION_ID, USER, MAILBOX_PATH, MAILBOX_ID, EVENT_ID_1);
+ MailboxListener.MailboxAdded EVENT_2 = new MailboxListener.MailboxAdded(SESSION_ID, USER, MAILBOX_PATH, MAILBOX_ID, EVENT_ID_2);
+ MailboxListener.MailboxAdded EVENT_3 = new MailboxListener.MailboxAdded(SESSION_ID, USER, MAILBOX_PATH, MAILBOX_ID, EVENT_ID_3);
+
+ Group GROUP_A = new EventBusContract.GroupA();
+ Group GROUP_B = new EventBusContract.GroupB();
+ Group NULL_GROUP = null;
+ Event NULL_EVENT = null;
+ Event.EventId NULL_EVENT_ID = null;
+
+ EventDeadLetters eventDeadLetters();
+
+ default Stream<Event.EventId> allEventIds() {
+ EventDeadLetters eventDeadLetters = eventDeadLetters();
+
+ return eventDeadLetters.groupsWithFailedEvents()
+ .flatMap(eventDeadLetters::failedEventIds)
+ .toStream();
+ }
+
+ interface StoreContract extends EventDeadLettersContract {
+
+ @Test
+ default void storeShouldThrowWhenNullGroup() {
+ EventDeadLetters eventDeadLetters = eventDeadLetters();
+
+ assertThatThrownBy(() -> eventDeadLetters.store(NULL_GROUP, EVENT_1))
+ .isInstanceOf(IllegalArgumentException.class);
+ }
+
+ @Test
+ default void storeShouldThrowWhenNullEvent() {
+ EventDeadLetters eventDeadLetters = eventDeadLetters();
+
+ assertThatThrownBy(() -> eventDeadLetters.store(GROUP_A, NULL_EVENT))
+ .isInstanceOf(IllegalArgumentException.class);
+ }
+
+ @Test
+ default void storeShouldThrowWhenBothGroupAndEventAreNull() {
+ EventDeadLetters eventDeadLetters = eventDeadLetters();
+
+ assertThatThrownBy(() -> eventDeadLetters.store(NULL_GROUP, NULL_EVENT))
+ .isInstanceOf(IllegalArgumentException.class);
+ }
+
+ @Test
+ default void storeShouldStoreGroupWithCorrespondingEvent() {
+ EventDeadLetters eventDeadLetters = eventDeadLetters();
+
+ eventDeadLetters.store(GROUP_A, EVENT_1).block();
+ assertThat(eventDeadLetters.failedEvent(GROUP_A, EVENT_1.getEventId()).block())
+ .isEqualTo(EVENT_1);
+ }
+
+ @Test
+ default void storeShouldIgnoreStoreDuplicatedEventsPerGroup() {
+ EventDeadLetters eventDeadLetters = eventDeadLetters();
+
+ eventDeadLetters.store(GROUP_A, EVENT_1).block();
+ eventDeadLetters.store(GROUP_A, EVENT_1).block();
+ eventDeadLetters.store(GROUP_A, EVENT_1).block();
+
+ assertThat(eventDeadLetters.failedEventIds(GROUP_A).toStream())
+ .containsExactly(EVENT_ID_1);
+ }
+
+ @Test
+ default void storeShouldKeepConsistencyWhenConcurrentStore() throws Exception {
+ EventDeadLetters eventDeadLetters = eventDeadLetters();
+
+ ImmutableMap<Integer, Group> groups = concurrentGroups();
+ Multimap<Integer, Event.EventId> storedEventIds = Multimaps.synchronizedSetMultimap(HashMultimap.create());
+
+ ConcurrentTestRunner.builder()
+ .operation((threadNumber, step) -> {
+ Event.EventId eventId = Event.EventId.random();
+ storedEventIds.put(threadNumber, eventId);
+ eventDeadLetters.store(groups.get(threadNumber), event(eventId));
+ })
+ .threadCount(THREAD_COUNT)
+ .operationCount(OPERATION_COUNT)
+ .runSuccessfullyWithin(RUN_SUCCESSFULLY_IN);
+
+ groups.forEach((groupId, group) -> {
+ Group storedGroup = groups.get(groupId);
+ assertThat(eventDeadLetters.failedEventIds(storedGroup).toStream())
+ .hasSameElementsAs(storedEventIds.get(groupId));
+ });
+ }
+ }
+
+ interface RemoveContract extends EventDeadLettersContract {
+
+ @Test
+ default void removeShouldThrowWhenGroupIsNull() {
+ EventDeadLetters eventDeadLetters = eventDeadLetters();
+
+ assertThatThrownBy(() -> eventDeadLetters.remove(NULL_GROUP, EVENT_ID_1))
+ .isInstanceOf(IllegalArgumentException.class);
+ }
+
+ @Test
+ default void removeShouldThrowWhenEventIdIsNull() {
+ EventDeadLetters eventDeadLetters = eventDeadLetters();
+
+ assertThatThrownBy(() -> eventDeadLetters.remove(GROUP_A, NULL_EVENT_ID))
+ .isInstanceOf(IllegalArgumentException.class);
+ }
+
+ @Test
+ default void removeShouldThrowWhenBothGroupAndEventIdAreNull() {
+ EventDeadLetters eventDeadLetters = eventDeadLetters();
+
+ assertThatThrownBy(() -> eventDeadLetters.remove(NULL_GROUP, NULL_EVENT_ID))
+ .isInstanceOf(IllegalArgumentException.class);
+ }
+
+ @Test
+ default void removeShouldRemoveMatched() {
+ EventDeadLetters eventDeadLetters = eventDeadLetters();
+
+ eventDeadLetters.store(GROUP_A, EVENT_1).block();
+ eventDeadLetters.store(GROUP_A, EVENT_2).block();
+
+ eventDeadLetters.remove(GROUP_A, EVENT_1.getEventId()).block();
+
+ assertThat(eventDeadLetters.failedEvent(GROUP_A, EVENT_1.getEventId()).block())
+ .isNull();
+ }
+
+ @Test
+ default void removeShouldKeepNonMatched() {
+ EventDeadLetters eventDeadLetters = eventDeadLetters();
+
+ eventDeadLetters.store(GROUP_A, EVENT_1).block();
+ eventDeadLetters.store(GROUP_A, EVENT_2).block();
+ eventDeadLetters.store(GROUP_A, EVENT_3).block();
+
+ eventDeadLetters.remove(GROUP_A, EVENT_1.getEventId()).block();
+
+ assertThat(eventDeadLetters.failedEventIds(GROUP_A).toStream())
+ .containsOnly(EVENT_ID_2, EVENT_ID_3);
+ }
+
+ @Test
+ default void removeShouldNotThrowWhenNoMatched() {
+ EventDeadLetters eventDeadLetters = eventDeadLetters();
+
+ eventDeadLetters.store(GROUP_A, EVENT_1).block();
+
+ assertThatCode(() -> eventDeadLetters.remove(GROUP_A, EVENT_2.getEventId()).block())
+ .doesNotThrowAnyException();
+ }
+
+ @Test
+ default void removeShouldKeepConsistencyWhenConcurrentRemove() throws Exception {
+ EventDeadLetters eventDeadLetters = eventDeadLetters();
+
+ ImmutableMap<Integer, Group> groups = concurrentGroups();
+ ConcurrentHashMap<Integer, Event.EventId> storedEventIds = new ConcurrentHashMap<>();
+
+ ConcurrentTestRunner.builder()
+ .operation((threadNumber, step) -> {
+ int operationIndex = threadNumber * OPERATION_COUNT + step;
+ Event.EventId eventId = Event.EventId.random();
+ storedEventIds.put(operationIndex, eventId);
+ eventDeadLetters.store(groups.get(threadNumber), event(eventId)).subscribe();
+ })
+ .threadCount(THREAD_COUNT)
+ .operationCount(OPERATION_COUNT)
+ .runSuccessfullyWithin(RUN_SUCCESSFULLY_IN);
+
+ ConcurrentTestRunner.builder()
+ .operation((threadNumber, step) -> {
+ int operationIndex = threadNumber * OPERATION_COUNT + step;
+ eventDeadLetters.remove(groups.get(threadNumber), storedEventIds.get(operationIndex));
+ })
+ .threadCount(THREAD_COUNT)
+ .operationCount(OPERATION_COUNT)
+ .runSuccessfullyWithin(RUN_SUCCESSFULLY_IN);
+
+ assertThat(allEventIds())
+ .isEmpty();
+ }
+ }
+
+ interface FailedEventContract extends EventDeadLettersContract {
+
+ @Test
+ default void failedEventShouldThrowWhenGroupIsNull() {
+ EventDeadLetters eventDeadLetters = eventDeadLetters();
+
+ assertThatThrownBy(() -> eventDeadLetters.failedEvent(NULL_GROUP, EVENT_ID_1))
+ .isInstanceOf(IllegalArgumentException.class);
+ }
+
+ @Test
+ default void failedEventShouldThrowWhenEventIdIsNull() {
+ EventDeadLetters eventDeadLetters = eventDeadLetters();
+
+ assertThatThrownBy(() -> eventDeadLetters.failedEvent(GROUP_A, NULL_EVENT_ID))
+ .isInstanceOf(IllegalArgumentException.class);
+ }
+
+ @Test
+ default void failedEventShouldThrowWhenBothGroupAndEventIdAreNull() {
+ EventDeadLetters eventDeadLetters = eventDeadLetters();
+
+ assertThatThrownBy(() -> eventDeadLetters.failedEvent(NULL_GROUP, NULL_EVENT_ID))
+ .isInstanceOf(IllegalArgumentException.class);
+ }
+
+ @Test
+ default void failedEventShouldReturnEmptyWhenNotFound() {
+ EventDeadLetters eventDeadLetters = eventDeadLetters();
+
+ eventDeadLetters.store(GROUP_A, EVENT_1).block();
+ eventDeadLetters.store(GROUP_A, EVENT_2).block();
+
+ assertThat(eventDeadLetters.failedEvent(GROUP_A, EVENT_ID_3).block())
+ .isNull();
+ }
+
+ @Test
+ default void failedEventShouldReturnEventWhenContains() {
+ EventDeadLetters eventDeadLetters = eventDeadLetters();
+
+ eventDeadLetters.store(GROUP_A, EVENT_1).block();
+ eventDeadLetters.store(GROUP_A, EVENT_2).block();
+
+ assertThat(eventDeadLetters.failedEvent(GROUP_A, EVENT_1.getEventId()).block())
+ .isEqualTo(EVENT_1);
+ }
+
+ @Test
+ default void failedEventShouldNotRemoveEvent() {
+ EventDeadLetters eventDeadLetters = eventDeadLetters();
+
+ eventDeadLetters.store(GROUP_A, EVENT_1).block();
+ eventDeadLetters.store(GROUP_A, EVENT_2).block();
+ eventDeadLetters.store(GROUP_A, EVENT_3).block();
+
+ eventDeadLetters.failedEvent(GROUP_A, EVENT_1.getEventId()).block();
+
+ assertThat(allEventIds())
+ .containsOnly(EVENT_ID_1, EVENT_ID_2, EVENT_ID_3);
+ }
+ }
+
+ interface FailedEventsContract extends EventDeadLettersContract {
+
+ @Test
+ default void failedEventsShouldThrowWhenGroupIsNull() {
+ EventDeadLetters eventDeadLetters = eventDeadLetters();
+
+ assertThatThrownBy(() -> eventDeadLetters.failedEventIds(NULL_GROUP))
+ .isInstanceOf(IllegalArgumentException.class);
+ }
+
+ @Test
+ default void failedEventsByGroupShouldReturnEmptyWhenNonMatch() {
+ EventDeadLetters eventDeadLetters = eventDeadLetters();
+
+ eventDeadLetters.store(GROUP_A, EVENT_1).block();
+ eventDeadLetters.store(GROUP_A, EVENT_2).block();
+ eventDeadLetters.store(GROUP_A, EVENT_3).block();
+
+ assertThat(eventDeadLetters.failedEventIds(GROUP_B).toStream())
+ .isEmpty();
+ }
+
+ @Test
+ default void failedEventsByGroupShouldReturnAllEventsCorrespondingToGivenGroup() {
+ EventDeadLetters eventDeadLetters = eventDeadLetters();
+
+ eventDeadLetters.store(GROUP_A, EVENT_1).block();
+ eventDeadLetters.store(GROUP_A, EVENT_2).block();
+ eventDeadLetters.store(GROUP_B, EVENT_3).block();
+
+ assertThat(eventDeadLetters.failedEventIds(GROUP_A).toStream())
+ .containsOnly(EVENT_ID_1, EVENT_ID_2);
+ }
+
+ @Test
+ default void failedEventsByGroupShouldNotRemoveEvents() {
+ EventDeadLetters eventDeadLetters = eventDeadLetters();
+
+ eventDeadLetters.store(GROUP_A, EVENT_1).block();
+ eventDeadLetters.store(GROUP_A, EVENT_2).block();
+ eventDeadLetters.store(GROUP_B, EVENT_3).block();
+
+ eventDeadLetters.failedEventIds(GROUP_A).toStream();
+
+ assertThat(allEventIds())
+ .containsOnly(EVENT_ID_1, EVENT_ID_2, EVENT_ID_3);
+ }
+ }
+
+ interface GroupsWithFailedEventsContract extends EventDeadLettersContract {
+ @Test
+ default void groupsWithFailedEventsShouldReturnAllStoredGroups() {
+ EventDeadLetters eventDeadLetters = eventDeadLetters();
+ eventDeadLetters.store(GROUP_A, EVENT_1).block();
+ eventDeadLetters.store(GROUP_B, EVENT_1).block();
+
+ assertThat(eventDeadLetters.groupsWithFailedEvents().toStream())
+ .containsOnly(GROUP_A, GROUP_B);
+ }
+
+ @Test
+ default void groupsWithFailedEventsShouldReturnEmptyWhenNoStored() {
+ EventDeadLetters eventDeadLetters = eventDeadLetters();
+
+ assertThat(eventDeadLetters.groupsWithFailedEvents().toStream()).isEmpty();
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/james-project/blob/7e2a49a0/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/MemoryEventDeadLetters.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/MemoryEventDeadLetters.java b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/MemoryEventDeadLetters.java
new file mode 100644
index 0000000..8464ed9
--- /dev/null
+++ b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/MemoryEventDeadLetters.java
@@ -0,0 +1,90 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.mailbox.events;
+
+import org.apache.james.mailbox.Event;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.publisher.MonoProcessor;
+
+public class MemoryEventDeadLetters implements EventDeadLetters {
+
+ private static final String REGISTERED_GROUP_CANNOT_BE_NULL = "registeredGroup cannot be null";
+ private static final String FAIL_DELIVERED_EVENT_CANNOT_BE_NULL = "failDeliveredEvent cannot be null";
+ private static final String FAIL_DELIVERED_ID_EVENT_CANNOT_BE_NULL = "failDeliveredEventId cannot be null";
+
+ private final Multimap<Group, Event> deadLetters;
+
+ MemoryEventDeadLetters() {
+ this.deadLetters = Multimaps.synchronizedSetMultimap(HashMultimap.create());
+ }
+
+ @Override
+ public Mono<Void> store(Group registeredGroup, Event failDeliveredEvent) {
+ Preconditions.checkArgument(registeredGroup != null, REGISTERED_GROUP_CANNOT_BE_NULL);
+ Preconditions.checkArgument(failDeliveredEvent != null, FAIL_DELIVERED_EVENT_CANNOT_BE_NULL);
+
+ return Mono.fromRunnable(() -> deadLetters.put(registeredGroup, failDeliveredEvent))
+ .subscribeWith(MonoProcessor.create())
+ .then();
+ }
+
+ @Override
+ public Mono<Void> remove(Group registeredGroup, Event.EventId failDeliveredEventId) {
+ Preconditions.checkArgument(registeredGroup != null, REGISTERED_GROUP_CANNOT_BE_NULL);
+ Preconditions.checkArgument(failDeliveredEventId != null, FAIL_DELIVERED_ID_EVENT_CANNOT_BE_NULL);
+
+ return Flux.fromIterable(deadLetters.get(registeredGroup))
+ .filter(event -> event.getEventId().equals(failDeliveredEventId))
+ .next()
+ .doOnNext(event -> deadLetters.remove(registeredGroup, event))
+ .subscribeWith(MonoProcessor.create())
+ .then();
+ }
+
+ @Override
+ public Mono<Event> failedEvent(Group registeredGroup, Event.EventId failDeliveredEventId) {
+ Preconditions.checkArgument(registeredGroup != null, REGISTERED_GROUP_CANNOT_BE_NULL);
+ Preconditions.checkArgument(failDeliveredEventId != null, FAIL_DELIVERED_ID_EVENT_CANNOT_BE_NULL);
+
+ return Flux.fromIterable(deadLetters.get(registeredGroup))
+ .filter(event -> event.getEventId().equals(failDeliveredEventId))
+ .next();
+ }
+
+ @Override
+ public Flux<Event.EventId> failedEventIds(Group registeredGroup) {
+ Preconditions.checkArgument(registeredGroup != null, REGISTERED_GROUP_CANNOT_BE_NULL);
+
+ return Flux.fromIterable(deadLetters.get(registeredGroup))
+ .map(Event::getEventId);
+ }
+
+ @Override
+ public Flux<Group> groupsWithFailedEvents() {
+ return Flux.fromIterable(deadLetters.keySet());
+ }
+}
http://git-wip-us.apache.org/repos/asf/james-project/blob/7e2a49a0/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/MemoryEventDeadLettersTest.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/MemoryEventDeadLettersTest.java b/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/MemoryEventDeadLettersTest.java
new file mode 100644
index 0000000..ab2c8a4
--- /dev/null
+++ b/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/MemoryEventDeadLettersTest.java
@@ -0,0 +1,44 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.mailbox.events;
+
+import static org.apache.james.mailbox.events.EventDeadLettersContract.FailedEventContract;
+import static org.apache.james.mailbox.events.EventDeadLettersContract.FailedEventsContract;
+import static org.apache.james.mailbox.events.EventDeadLettersContract.GroupsWithFailedEventsContract;
+import static org.apache.james.mailbox.events.EventDeadLettersContract.RemoveContract;
+import static org.apache.james.mailbox.events.EventDeadLettersContract.StoreContract;
+
+import org.junit.jupiter.api.BeforeEach;
+
+class MemoryEventDeadLettersTest implements StoreContract, RemoveContract, FailedEventContract, FailedEventsContract,
+ GroupsWithFailedEventsContract {
+
+ private MemoryEventDeadLetters eventDeadLetters;
+
+ @BeforeEach
+ void setUp() {
+ eventDeadLetters = new MemoryEventDeadLetters();
+ }
+
+ @Override
+ public EventDeadLetters eventDeadLetters() {
+ return eventDeadLetters;
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org