You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2019/01/21 03:06:19 UTC

[1/5] james-project git commit: MAILBOX-372 Use RetryBackoffConfiguration in InVmEventDelivery

Repository: james-project
Updated Branches:
  refs/heads/master a760297cc -> cf996f1af


MAILBOX-372 Use RetryBackoffConfiguration in InVmEventDelivery


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/cf996f1a
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/cf996f1a
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/cf996f1a

Branch: refs/heads/master
Commit: cf996f1afce58a98b9a32bc5fc365e9112b9364f
Parents: 2d48bb0
Author: tran tien duc <dt...@linagora.com>
Authored: Wed Jan 16 17:13:16 2019 +0700
Committer: Benoit Tellier <bt...@linagora.com>
Committed: Mon Jan 21 10:04:49 2019 +0700

----------------------------------------------------------------------
 .../mailbox/events/delivery/InVmEventDelivery.java  | 16 ++++++++++------
 .../james/mailbox/events/InVMEventBusTest.java      |  2 +-
 .../events/delivery/InVmEventDeliveryTest.java      |  3 ++-
 .../james/modules/mailbox/DefaultEventModule.java   |  5 ++++-
 4 files changed, 17 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/cf996f1a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/InVmEventDelivery.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/InVmEventDelivery.java b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/InVmEventDelivery.java
index 90e64c8..84a6a90 100644
--- a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/InVmEventDelivery.java
+++ b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/InVmEventDelivery.java
@@ -27,6 +27,7 @@ import javax.inject.Inject;
 
 import org.apache.james.mailbox.Event;
 import org.apache.james.mailbox.MailboxListener;
+import org.apache.james.mailbox.events.RetryBackoffConfiguration;
 import org.apache.james.metrics.api.MetricFactory;
 import org.apache.james.metrics.api.TimeMetric;
 import org.slf4j.Logger;
@@ -46,17 +47,20 @@ public class InVmEventDelivery implements EventDelivery {
     }
 
     private static final Logger LOGGER = LoggerFactory.getLogger(InVmEventDelivery.class);
-    private static final int MAX_RETRIES = 3;
-    private static final Duration FIRST_BACKOFF = Duration.ofMillis(100);
     private static final Duration MAX_BACKOFF = Duration.ofMillis(Long.MAX_VALUE);
-    private static final double DEFAULT_JITTER_FACTOR = 0.5;
 
     private final MetricFactory metricFactory;
+    private final RetryBackoffConfiguration retryBackoff;
 
     @Inject
     @VisibleForTesting
-    public InVmEventDelivery(MetricFactory metricFactory) {
+    public InVmEventDelivery(MetricFactory metricFactory, RetryBackoffConfiguration retryBackoff) {
         this.metricFactory = metricFactory;
+        this.retryBackoff = retryBackoff;
+    }
+
+    public InVmEventDelivery(MetricFactory metricFactory) {
+        this(metricFactory, RetryBackoffConfiguration.DEFAULT);
     }
 
     @Override
@@ -106,10 +110,10 @@ public class InVmEventDelivery implements EventDelivery {
         }
 
         return firstDelivery
-            .retryBackoff(MAX_RETRIES, FIRST_BACKOFF, MAX_BACKOFF, DEFAULT_JITTER_FACTOR)
+            .retryBackoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff(), MAX_BACKOFF, retryBackoff.getJitterFactor())
             .doOnError(throwable -> LOGGER.error("listener {} exceeded maximum retry({}) to handle event {}",
                 listenerName(mailboxListener),
-                MAX_RETRIES,
+                retryBackoff.getMaxRetries(),
                 eventName(event),
                 throwable))
             .then();

http://git-wip-us.apache.org/repos/asf/james-project/blob/cf996f1a/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/InVMEventBusTest.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/InVMEventBusTest.java b/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/InVMEventBusTest.java
index 8df4557..2ee725b 100644
--- a/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/InVMEventBusTest.java
+++ b/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/InVMEventBusTest.java
@@ -32,7 +32,7 @@ public class InVMEventBusTest implements KeyContract.SingleEventBusKeyContract,
     void setUp() {
         eventBus = new InVMEventBus(
             new InVmEventDelivery(
-                new NoopMetricFactory()));
+                new NoopMetricFactory(), RetryBackoffConfiguration.DEFAULT));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/james-project/blob/cf996f1a/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/InVmEventDeliveryTest.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/InVmEventDeliveryTest.java b/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/InVmEventDeliveryTest.java
index d09eb78..c0ae5a1 100644
--- a/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/InVmEventDeliveryTest.java
+++ b/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/InVmEventDeliveryTest.java
@@ -35,6 +35,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.james.mailbox.MailboxListener;
+import org.apache.james.mailbox.events.RetryBackoffConfiguration;
 import org.apache.james.mailbox.util.EventCollector;
 import org.apache.james.metrics.api.NoopMetricFactory;
 import org.assertj.core.api.SoftAssertions;
@@ -59,7 +60,7 @@ class InVmEventDeliveryTest {
         event = mock(MailboxListener.MailboxEvent.class);
         listener = mock(MailboxListener.class);
         listener2 = mock(MailboxListener.class);
-        inVmEventDelivery = new InVmEventDelivery(new NoopMetricFactory());
+        inVmEventDelivery = new InVmEventDelivery(new NoopMetricFactory(), RetryBackoffConfiguration.DEFAULT);
     }
 
     @Nested

http://git-wip-us.apache.org/repos/asf/james-project/blob/cf996f1a/server/container/guice/mailbox/src/main/java/org/apache/james/modules/mailbox/DefaultEventModule.java
----------------------------------------------------------------------
diff --git a/server/container/guice/mailbox/src/main/java/org/apache/james/modules/mailbox/DefaultEventModule.java b/server/container/guice/mailbox/src/main/java/org/apache/james/modules/mailbox/DefaultEventModule.java
index 83056d1..2cd7252 100644
--- a/server/container/guice/mailbox/src/main/java/org/apache/james/modules/mailbox/DefaultEventModule.java
+++ b/server/container/guice/mailbox/src/main/java/org/apache/james/modules/mailbox/DefaultEventModule.java
@@ -28,6 +28,7 @@ import org.apache.james.lifecycle.api.Configurable;
 import org.apache.james.mailbox.MailboxListener;
 import org.apache.james.mailbox.events.EventBus;
 import org.apache.james.mailbox.events.InVMEventBus;
+import org.apache.james.mailbox.events.RetryBackoffConfiguration;
 import org.apache.james.mailbox.events.delivery.EventDelivery;
 import org.apache.james.mailbox.events.delivery.InVmEventDelivery;
 import org.apache.james.server.core.configuration.ConfigurationProvider;
@@ -48,11 +49,13 @@ public class DefaultEventModule extends AbstractModule {
         bind(MailboxListenersLoaderImpl.class).in(Scopes.SINGLETON);
         bind(InVmEventDelivery.class).in(Scopes.SINGLETON);
         bind(InVMEventBus.class).in(Scopes.SINGLETON);
-        
+
         bind(MailboxListenersLoader.class).to(MailboxListenersLoaderImpl.class);
         bind(EventDelivery.class).to(InVmEventDelivery.class);
         bind(EventBus.class).to(InVMEventBus.class);
 
+        bind(RetryBackoffConfiguration.class).toInstance(RetryBackoffConfiguration.DEFAULT);
+
         Multibinder.newSetBinder(binder(), MailboxListener.GroupMailboxListener.class);
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[2/5] james-project git commit: MAILBOX-372 ErrorHandling is applied for only group registered listeners

Posted by bt...@apache.org.
MAILBOX-372 ErrorHandling is applied for only group registered listeners


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/b22103d0
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/b22103d0
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/b22103d0

Branch: refs/heads/master
Commit: b22103d09b9b90bd4fb74828691df4e3687965f1
Parents: d4e5693
Author: tran tien duc <dt...@linagora.com>
Authored: Tue Jan 15 16:57:39 2019 +0700
Committer: Benoit Tellier <bt...@linagora.com>
Committed: Mon Jan 21 10:04:49 2019 +0700

----------------------------------------------------------------------
 .../mailbox/events/ErrorHandlingContract.java   |  17 +++
 .../james/mailbox/events/KeyContract.java       |  12 +-
 .../james/mailbox/events/InVMEventBus.java      |  19 ++--
 .../mailbox/events/delivery/EventDelivery.java  |   2 +
 .../events/delivery/InVmEventDelivery.java      |  35 +++++-
 .../events/delivery/InVmEventDeliveryTest.java  | 113 +++++++++++++++++--
 .../resources/META-INF/spring/event-system.xml  |   7 +-
 7 files changed, 171 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/b22103d0/mailbox/api/src/test/java/org/apache/james/mailbox/events/ErrorHandlingContract.java
----------------------------------------------------------------------
diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/ErrorHandlingContract.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/ErrorHandlingContract.java
index c10c542..069798d 100644
--- a/mailbox/api/src/test/java/org/apache/james/mailbox/events/ErrorHandlingContract.java
+++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/ErrorHandlingContract.java
@@ -23,6 +23,7 @@ import static org.apache.james.mailbox.events.EventBusTestFixture.EVENT;
 import static org.apache.james.mailbox.events.EventBusTestFixture.EVENT_2;
 import static org.apache.james.mailbox.events.EventBusTestFixture.EVENT_ID;
 import static org.apache.james.mailbox.events.EventBusTestFixture.GROUP_A;
+import static org.apache.james.mailbox.events.EventBusTestFixture.KEY_1;
 import static org.apache.james.mailbox.events.EventBusTestFixture.NO_KEYS;
 import static org.apache.james.mailbox.events.EventBusTestFixture.WAIT_CONDITION;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -41,6 +42,8 @@ import org.apache.james.mailbox.util.EventCollector;
 import org.assertj.core.api.SoftAssertions;
 import org.junit.jupiter.api.Test;
 
+import com.google.common.collect.ImmutableSet;
+
 interface ErrorHandlingContract extends EventBusContract {
 
     class ThrowingListener implements MailboxListener {
@@ -66,6 +69,20 @@ interface ErrorHandlingContract extends EventBusContract {
     }
 
     @Test
+    default void retryingIsNotAppliedForKeyRegistrations() {
+        EventCollector eventCollector = eventCollector();
+
+        doThrow(new RuntimeException())
+            .when(eventCollector).event(EVENT);
+
+        eventBus().register(eventCollector, KEY_1);
+        eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1)).block();
+
+        assertThat(eventCollector.getEvents())
+            .isEmpty();
+    }
+
+    @Test
     default void listenerShouldReceiveWhenFailsLessThanMaxRetries() {
         EventCollector eventCollector = eventCollector();
 

http://git-wip-us.apache.org/repos/asf/james-project/blob/b22103d0/mailbox/api/src/test/java/org/apache/james/mailbox/events/KeyContract.java
----------------------------------------------------------------------
diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/KeyContract.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/KeyContract.java
index 317925a..6bf091b 100644
--- a/mailbox/api/src/test/java/org/apache/james/mailbox/events/KeyContract.java
+++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/KeyContract.java
@@ -20,6 +20,7 @@
 package org.apache.james.mailbox.events;
 
 import static org.apache.james.mailbox.events.EventBusTestFixture.EVENT;
+import static org.apache.james.mailbox.events.EventBusTestFixture.EVENT_2;
 import static org.apache.james.mailbox.events.EventBusTestFixture.FIVE_HUNDRED_MS;
 import static org.apache.james.mailbox.events.EventBusTestFixture.KEY_1;
 import static org.apache.james.mailbox.events.EventBusTestFixture.KEY_2;
@@ -35,7 +36,6 @@ import static org.mockito.Mockito.after;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -261,17 +261,11 @@ public interface KeyContract extends EventBusContract {
 
         @Test
         default void failingRegisteredListenersShouldNotAbortRegisteredDelivery() {
-            EventBusTestFixture.MailboxListenerCountingSuccessfulExecution listener = spy(new EventBusTestFixture.MailboxListenerCountingSuccessfulExecution());
-            doThrow(new RuntimeException())
-                .doThrow(new RuntimeException())
-                .doThrow(new RuntimeException())
-                .doThrow(new RuntimeException())
-                .doCallRealMethod()
-                .when(listener).event(any());
+            EventBusTestFixture.MailboxListenerCountingSuccessfulExecution listener = new EventBusTestFixture.EventMatcherThrowingListener(ImmutableSet.of(EVENT));
             eventBus().register(listener, KEY_1);
 
             eventBus().dispatch(EVENT, KEY_1).block();
-            eventBus().dispatch(EVENT, KEY_1).block();
+            eventBus().dispatch(EVENT_2, KEY_1).block();
 
             WAIT_CONDITION
                 .until(() -> assertThat(listener.numberOfEventCalls()).isEqualTo(1));

http://git-wip-us.apache.org/repos/asf/james-project/blob/b22103d0/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/InVMEventBus.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/InVMEventBus.java b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/InVMEventBus.java
index 8069739..3bb11b0 100644
--- a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/InVMEventBus.java
+++ b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/InVMEventBus.java
@@ -30,10 +30,10 @@ import org.apache.james.mailbox.events.delivery.EventDelivery;
 
 import com.github.steveash.guavate.Guavate;
 import com.google.common.collect.HashMultimap;
-import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Multimaps;
 
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 public class InVMEventBus implements EventBus {
@@ -66,7 +66,10 @@ public class InVMEventBus implements EventBus {
     @Override
     public Mono<Void> dispatch(Event event, Set<RegistrationKey> keys) {
         if (!event.isNoop()) {
-            return eventDelivery.deliver(registeredListeners(keys), event).synchronousListenerFuture()
+            return Flux.merge(
+                eventDelivery.deliverWithRetries(groups.values(), event).synchronousListenerFuture(),
+                eventDelivery.deliver(registeredListenersByKeys(keys), event).synchronousListenerFuture())
+                .then()
                 .onErrorResume(throwable -> Mono.empty());
         }
         return Mono.empty();
@@ -76,13 +79,9 @@ public class InVMEventBus implements EventBus {
         return groups.keySet();
     }
 
-    private Set<MailboxListener> registeredListeners(Set<RegistrationKey> keys) {
-        return ImmutableSet.<MailboxListener>builder()
-            .addAll(groups.values())
-            .addAll(keys.stream()
-                .flatMap(registrationKey -> registrations.get(registrationKey).stream())
-                .collect(Guavate.toImmutableList()))
-            .build();
+    private Set<MailboxListener> registeredListenersByKeys(Set<RegistrationKey> keys) {
+        return keys.stream()
+            .flatMap(registrationKey -> registrations.get(registrationKey).stream())
+            .collect(Guavate.toImmutableSet());
     }
-
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/b22103d0/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDelivery.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDelivery.java b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDelivery.java
index a0240b2..359e4d7 100644
--- a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDelivery.java
+++ b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDelivery.java
@@ -49,4 +49,6 @@ public interface EventDelivery {
 
 
     ExecutionStages deliver(Collection<MailboxListener> mailboxListeners, Event event);
+
+    ExecutionStages deliverWithRetries(Collection<MailboxListener> mailboxListeners, Event event);
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/b22103d0/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/InVmEventDelivery.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/InVmEventDelivery.java b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/InVmEventDelivery.java
index e7eb233..90e64c8 100644
--- a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/InVmEventDelivery.java
+++ b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/InVmEventDelivery.java
@@ -40,6 +40,11 @@ import reactor.core.publisher.MonoProcessor;
 import reactor.core.scheduler.Schedulers;
 
 public class InVmEventDelivery implements EventDelivery {
+
+    private enum DeliveryOption {
+        NO_RETRY, WITH_RETRY
+    }
+
     private static final Logger LOGGER = LoggerFactory.getLogger(InVmEventDelivery.class);
     private static final int MAX_RETRIES = 3;
     private static final Duration FIRST_BACKOFF = Duration.ofMillis(100);
@@ -56,9 +61,20 @@ public class InVmEventDelivery implements EventDelivery {
 
     @Override
     public ExecutionStages deliver(Collection<MailboxListener> mailboxListeners, Event event) {
-        Mono<Void> synchronousListeners = doDeliver(filterByExecutionMode(mailboxListeners, MailboxListener.ExecutionMode.SYNCHRONOUS), event)
+        return deliverByOption(mailboxListeners, event, DeliveryOption.NO_RETRY);
+    }
+
+    @Override
+    public ExecutionStages deliverWithRetries(Collection<MailboxListener> mailboxListeners, Event event) {
+        return deliverByOption(mailboxListeners, event, DeliveryOption.WITH_RETRY);
+    }
+
+    private ExecutionStages deliverByOption(Collection<MailboxListener> mailboxListeners, Event event, DeliveryOption deliveryOption) {
+        Mono<Void> synchronousListeners = doDeliver(
+            filterByExecutionMode(mailboxListeners, MailboxListener.ExecutionMode.SYNCHRONOUS), event, deliveryOption)
             .subscribeWith(MonoProcessor.create());
-        Mono<Void> asyncListener = doDeliver(filterByExecutionMode(mailboxListeners, MailboxListener.ExecutionMode.ASYNCHRONOUS), event)
+        Mono<Void> asyncListener = doDeliver(
+            filterByExecutionMode(mailboxListeners, MailboxListener.ExecutionMode.ASYNCHRONOUS), event, deliveryOption)
             .subscribeWith(MonoProcessor.create());
 
         return new ExecutionStages(synchronousListeners, asyncListener);
@@ -69,20 +85,27 @@ public class InVmEventDelivery implements EventDelivery {
             .filter(listener -> listener.getExecutionMode() == executionMode);
     }
 
-    private Mono<Void> doDeliver(Stream<MailboxListener> mailboxListeners, Event event) {
+    private Mono<Void> doDeliver(Stream<MailboxListener> mailboxListeners, Event event, DeliveryOption deliveryOption) {
         return Flux.fromStream(mailboxListeners)
-            .flatMap(mailboxListener -> deliveryWithRetries(event, mailboxListener))
+            .flatMap(mailboxListener -> deliveryWithRetries(event, mailboxListener, deliveryOption))
             .then()
             .subscribeOn(Schedulers.elastic());
     }
 
-    private Mono<Void> deliveryWithRetries(Event event, MailboxListener mailboxListener) {
-        return Mono.fromRunnable(() -> doDeliverToListener(mailboxListener, event))
+    private Mono<Void> deliveryWithRetries(Event event, MailboxListener mailboxListener, DeliveryOption deliveryOption) {
+        Mono<Void> firstDelivery = Mono.fromRunnable(() -> doDeliverToListener(mailboxListener, event))
             .doOnError(throwable -> LOGGER.error("Error while processing listener {} for {}",
                 listenerName(mailboxListener),
                 eventName(event),
                 throwable))
             .subscribeOn(Schedulers.elastic())
+            .then();
+
+        if (deliveryOption == DeliveryOption.NO_RETRY) {
+            return firstDelivery;
+        }
+
+        return firstDelivery
             .retryBackoff(MAX_RETRIES, FIRST_BACKOFF, MAX_BACKOFF, DEFAULT_JITTER_FACTOR)
             .doOnError(throwable -> LOGGER.error("listener {} exceeded maximum retry({}) to handle event {}",
                 listenerName(mailboxListener),

http://git-wip-us.apache.org/repos/asf/james-project/blob/b22103d0/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/InVmEventDeliveryTest.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/InVmEventDeliveryTest.java b/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/InVmEventDeliveryTest.java
index 207d319..d09eb78 100644
--- a/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/InVmEventDeliveryTest.java
+++ b/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/InVmEventDeliveryTest.java
@@ -87,19 +87,46 @@ class InVmEventDeliveryTest {
             void deliverShouldNotDeliverEventToListenerWhenException() {
                 doThrow(RuntimeException.class).when(syncEventCollector).event(event);
 
-                inVmEventDelivery.deliver(ImmutableList.of(syncEventCollector), event).allListenerFuture().subscribe();
+                assertThatThrownBy(() -> inVmEventDelivery
+                    .deliver(ImmutableList.of(syncEventCollector), event).allListenerFuture()
+                    .block())
+                    .isInstanceOf(RuntimeException.class);
 
                 assertThat(syncEventCollector.getEvents())
                     .isEmpty();
             }
 
             @Test
-            void deliverShouldBeErrorWhenException() {
+            void deliverWithRetriesShouldNotDeliverEventToListenerWhenException() {
                 doThrow(RuntimeException.class).when(syncEventCollector).event(event);
 
                 assertThatThrownBy(() -> inVmEventDelivery
+                    .deliverWithRetries(ImmutableList.of(syncEventCollector), event).allListenerFuture()
+                    .block())
+                    .isInstanceOf(IllegalStateException.class);
+
+                assertThat(syncEventCollector.getEvents())
+                    .isEmpty();
+            }
+
+            @Test
+            void deliverShouldBeErrorWhenException() {
+                doThrow(new RuntimeException("mock exception")).when(syncEventCollector).event(event);
+
+                assertThatThrownBy(() -> inVmEventDelivery
                     .deliver(ImmutableList.of(syncEventCollector), event).allListenerFuture()
                     .block())
+                    .isInstanceOf(RuntimeException.class)
+                    .hasMessageContaining("mock exception");
+            }
+
+            @Test
+            void deliverWithRetriesShouldBeErrorWhenException() {
+                doThrow(RuntimeException.class).when(syncEventCollector).event(event);
+
+                assertThatThrownBy(() -> inVmEventDelivery
+                    .deliverWithRetries(ImmutableList.of(syncEventCollector), event).allListenerFuture()
+                    .block())
                 .isInstanceOf(IllegalStateException.class)
                 .hasMessageContaining("Retries exhausted");
             }
@@ -109,10 +136,25 @@ class InVmEventDeliveryTest {
         class AsynchronousOnly {
             @Test
             void deliverShouldNotDeliverEventToListenerWhenException() {
+                doThrow(RuntimeException.class).when(asyncEventCollector).event(event);
+
+                assertThatThrownBy(() -> inVmEventDelivery
+                    .deliver(ImmutableList.of(asyncEventCollector), event).allListenerFuture()
+                    .block())
+                    .isInstanceOf(RuntimeException.class);
 
+                assertThat(asyncEventCollector.getEvents())
+                    .isEmpty();
+            }
+
+            @Test
+            void deliverWithRetriesShouldNotDeliverEventToListenerWhenException() {
                 doThrow(RuntimeException.class).when(asyncEventCollector).event(event);
 
-                inVmEventDelivery.deliver(ImmutableList.of(asyncEventCollector), event).allListenerFuture().subscribe();
+                assertThatThrownBy(() -> inVmEventDelivery
+                    .deliverWithRetries(ImmutableList.of(asyncEventCollector), event).allListenerFuture()
+                    .block())
+                    .isInstanceOf(IllegalStateException.class);
 
                 assertThat(asyncEventCollector.getEvents())
                     .isEmpty();
@@ -120,13 +162,24 @@ class InVmEventDeliveryTest {
 
             @Test
             void deliverShouldBeErrorWhenException() {
-                doThrow(RuntimeException.class).when(asyncEventCollector).event(event);
+                doThrow(new RuntimeException("mock exception")).when(asyncEventCollector).event(event);
 
                 assertThatThrownBy(() -> inVmEventDelivery
                     .deliver(ImmutableList.of(asyncEventCollector), event).allListenerFuture()
                     .block())
-                .isInstanceOf(IllegalStateException.class)
-                .hasMessageContaining("Retries exhausted");
+                    .isInstanceOf(RuntimeException.class)
+                    .hasMessageContaining("mock exception");
+            }
+
+            @Test
+            void deliverWithRetriesShouldBeErrorWhenException() {
+                doThrow(RuntimeException.class).when(asyncEventCollector).event(event);
+
+                assertThatThrownBy(() -> inVmEventDelivery
+                    .deliverWithRetries(ImmutableList.of(asyncEventCollector), event).allListenerFuture()
+                    .block())
+                    .isInstanceOf(IllegalStateException.class)
+                    .hasMessageContaining("Retries exhausted");
             }
         }
 
@@ -136,7 +189,10 @@ class InVmEventDeliveryTest {
             void deliverShouldDeliverEventToSyncListenerWhenAsyncGetException() {
                 doThrow(RuntimeException.class).when(asyncEventCollector).event(event);
 
-                inVmEventDelivery.deliver(ImmutableList.of(asyncEventCollector, syncEventCollector), event).allListenerFuture().subscribe();
+                assertThatThrownBy(() -> inVmEventDelivery
+                    .deliver(ImmutableList.of(asyncEventCollector, syncEventCollector), event).allListenerFuture()
+                    .block())
+                    .isInstanceOf(RuntimeException.class);
 
                 SoftAssertions.assertSoftly(softly -> {
                     softly.assertThat(asyncEventCollector.getEvents()).isEmpty();
@@ -146,6 +202,22 @@ class InVmEventDeliveryTest {
             }
 
             @Test
+            void deliverWithRetriesShouldDeliverEventToSyncListenerWhenAsyncGetException() {
+                doThrow(RuntimeException.class).when(asyncEventCollector).event(event);
+
+                assertThatThrownBy(() -> inVmEventDelivery
+                    .deliverWithRetries(ImmutableList.of(asyncEventCollector, syncEventCollector), event).allListenerFuture()
+                    .block())
+                    .isInstanceOf(IllegalStateException.class)
+                    .hasMessageContaining("Retries exhausted");
+
+                SoftAssertions.assertSoftly(softly -> {
+                    softly.assertThat(asyncEventCollector.getEvents()).isEmpty();
+                    softly.assertThat(syncEventCollector.getEvents()).hasSize(1);
+                });
+            }
+
+            @Test
             void deliverShouldDeliverEventToAsyncListenerWhenSyncGetException() {
                 doThrow(RuntimeException.class).when(syncEventCollector).event(event);
 
@@ -157,16 +229,41 @@ class InVmEventDeliveryTest {
                     softly.assertThat(syncEventCollector.getEvents()).isEmpty();
                     softly.assertThat(asyncEventCollector.getEvents()).hasSize(1);
                 });
+            }
+
+            @Test
+            void deliverWithRetriesShouldDeliverEventToAsyncListenerWhenSyncGetException() {
+                doThrow(RuntimeException.class).when(syncEventCollector).event(event);
+
+                inVmEventDelivery.deliverWithRetries(ImmutableList.of(asyncEventCollector, syncEventCollector), event).allListenerFuture()
+                    .onErrorResume(e -> Mono.empty())
+                    .block();
 
+                SoftAssertions.assertSoftly(softly -> {
+                    softly.assertThat(syncEventCollector.getEvents()).isEmpty();
+                    softly.assertThat(asyncEventCollector.getEvents()).hasSize(1);
+                });
             }
 
             @Test
             void deliverShouldBeErrorWhenException() {
+                doThrow(new RuntimeException("mock exception")).when(syncEventCollector).event(event);
+                doThrow(new RuntimeException("mock exception")).when(asyncEventCollector).event(event);
+
+                assertThatThrownBy(() -> inVmEventDelivery
+                    .deliver(ImmutableList.of(asyncEventCollector), event).allListenerFuture()
+                    .block())
+                    .isInstanceOf(RuntimeException.class)
+                    .hasMessageContaining("mock exception");
+            }
+
+            @Test
+            void deliverWithRetriesShouldBeErrorWhenException() {
                 doThrow(RuntimeException.class).when(syncEventCollector).event(event);
                 doThrow(RuntimeException.class).when(asyncEventCollector).event(event);
 
                 assertThatThrownBy(() -> inVmEventDelivery
-                    .deliver(ImmutableList.of(asyncEventCollector), event).allListenerFuture()
+                    .deliverWithRetries(ImmutableList.of(asyncEventCollector), event).allListenerFuture()
                     .block())
                 .isInstanceOf(IllegalStateException.class)
                 .hasMessageContaining("Retries exhausted");

http://git-wip-us.apache.org/repos/asf/james-project/blob/b22103d0/mailbox/spring/src/main/resources/META-INF/spring/event-system.xml
----------------------------------------------------------------------
diff --git a/mailbox/spring/src/main/resources/META-INF/spring/event-system.xml b/mailbox/spring/src/main/resources/META-INF/spring/event-system.xml
index ff0447a..609d183 100644
--- a/mailbox/spring/src/main/resources/META-INF/spring/event-system.xml
+++ b/mailbox/spring/src/main/resources/META-INF/spring/event-system.xml
@@ -20,14 +20,19 @@
 
 <beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+       xmlns:util="http://www.springframework.org/schema/util"
        xsi:schemaLocation="
-          http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
+          http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
+          http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd">
 
     <bean id="event-bus" class="org.apache.james.mailbox.events.InVMEventBus" lazy-init="true">
         <constructor-arg index="0" ref="event-delivery"/>
     </bean>
     <bean id="event-delivery" class="org.apache.james.mailbox.events.delivery.InVmEventDelivery" lazy-init="true">
         <constructor-arg index="0" ref="metricFactory"/>
+        <constructor-arg>
+            <util:constant static-field="org.apache.james.mailbox.events.RetryBackoffConfiguration.DEFAULT"/>
+        </constructor-arg>
     </bean>
 
 </beans>
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[5/5] james-project git commit: JAMES-2646 list sources cassandra table, DAO and tests

Posted by bt...@apache.org.
JAMES-2646 list sources cassandra table, DAO and tests


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/d4e56936
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/d4e56936
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/d4e56936

Branch: refs/heads/master
Commit: d4e569361b47d683871161ce1759da29c1aea285
Parents: 7f2f8fe
Author: Rene Cordier <rc...@linagora.com>
Authored: Fri Jan 18 15:02:48 2019 +0700
Committer: Benoit Tellier <bt...@linagora.com>
Committed: Mon Jan 21 10:04:49 2019 +0700

----------------------------------------------------------------------
 .../java/org/apache/james/rrt/lib/Mapping.java  |   7 ++
 .../cassandra/CassandraMappingsSourcesDAO.java  | 101 +++++++++++++++++++
 .../james/rrt/cassandra/CassandraRRTModule.java |  13 ++-
 .../CassandraRecipientRewriteTable.java         |  24 +++--
 .../tables/CassandraMappingsSourcesTable.java   |  29 ++++++
 .../CassandraMappingsSourcesDAOTest.java        |  80 +++++++++++++++
 .../CassandraRecipientRewriteTableDAOTest.java  |  53 ++++++----
 .../CassandraRecipientRewriteTableTest.java     |   3 +-
 .../james/rrt/cassandra/CassandraStepdefs.java  |   3 +-
 9 files changed, 281 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/d4e56936/server/data/data-api/src/main/java/org/apache/james/rrt/lib/Mapping.java
----------------------------------------------------------------------
diff --git a/server/data/data-api/src/main/java/org/apache/james/rrt/lib/Mapping.java b/server/data/data-api/src/main/java/org/apache/james/rrt/lib/Mapping.java
index 379c9e7..b87b228 100644
--- a/server/data/data-api/src/main/java/org/apache/james/rrt/lib/Mapping.java
+++ b/server/data/data-api/src/main/java/org/apache/james/rrt/lib/Mapping.java
@@ -241,6 +241,11 @@ public interface Mapping {
         }
 
         @Override
+        public String getMappingValue() {
+            return mapping;
+        }
+
+        @Override
         public String getErrorMessage() {
             Preconditions.checkState(getType() == Type.Error);
             return mapping;
@@ -284,6 +289,8 @@ public interface Mapping {
     }
 
     Type getType();
+
+    String getMappingValue();
     
     String asString();
 

http://git-wip-us.apache.org/repos/asf/james-project/blob/d4e56936/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraMappingsSourcesDAO.java
----------------------------------------------------------------------
diff --git a/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraMappingsSourcesDAO.java b/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraMappingsSourcesDAO.java
new file mode 100644
index 0000000..777a50e
--- /dev/null
+++ b/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraMappingsSourcesDAO.java
@@ -0,0 +1,101 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.rrt.cassandra;
+
+import static com.datastax.driver.core.querybuilder.QueryBuilder.bindMarker;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.delete;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
+import static org.apache.james.rrt.cassandra.tables.CassandraMappingsSourcesTable.MAPPING_TYPE;
+import static org.apache.james.rrt.cassandra.tables.CassandraMappingsSourcesTable.MAPPING_VALUE;
+import static org.apache.james.rrt.cassandra.tables.CassandraMappingsSourcesTable.SOURCE;
+import static org.apache.james.rrt.cassandra.tables.CassandraMappingsSourcesTable.TABLE_NAME;
+
+import javax.inject.Inject;
+
+import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
+import org.apache.james.rrt.lib.Mapping;
+import org.apache.james.rrt.lib.MappingSource;
+
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Session;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+class CassandraMappingsSourcesDAO {
+    private final CassandraAsyncExecutor executor;
+    private final PreparedStatement insertStatement;
+    private final PreparedStatement deleteStatement;
+    private final PreparedStatement retrieveSourcesStatement;
+
+    @Inject
+    CassandraMappingsSourcesDAO(Session session) {
+        this.executor = new CassandraAsyncExecutor(session);
+        this.insertStatement = prepareInsertStatement(session);
+        this.deleteStatement = prepareDelete(session);
+        this.retrieveSourcesStatement = prepareRetrieveSourcesStatement(session);
+    }
+
+    private PreparedStatement prepareInsertStatement(Session session) {
+        return session.prepare(insertInto(TABLE_NAME)
+            .value(MAPPING_TYPE, bindMarker(MAPPING_TYPE))
+            .value(MAPPING_VALUE, bindMarker(MAPPING_VALUE))
+            .value(SOURCE, bindMarker(SOURCE)));
+    }
+
+    private PreparedStatement prepareDelete(Session session) {
+        return session.prepare(delete()
+            .from(TABLE_NAME)
+            .where(eq(MAPPING_TYPE, bindMarker(MAPPING_TYPE)))
+            .and(eq(MAPPING_VALUE, bindMarker(MAPPING_VALUE)))
+            .and(eq(SOURCE, bindMarker(SOURCE))));
+    }
+
+    private PreparedStatement prepareRetrieveSourcesStatement(Session session) {
+        return session.prepare(select(SOURCE)
+            .from(TABLE_NAME)
+            .where(eq(MAPPING_TYPE, bindMarker(MAPPING_TYPE)))
+            .and(eq(MAPPING_VALUE, bindMarker(MAPPING_VALUE))));
+    }
+
+    Mono<Void> addMapping(Mapping mapping, MappingSource source) {
+        return executor.executeVoidReactor(insertStatement.bind()
+            .setString(MAPPING_TYPE, mapping.getType().asPrefix())
+            .setString(MAPPING_VALUE, mapping.getMappingValue())
+            .setString(SOURCE, source.asMailAddressString()));
+    }
+
+    Mono<Void> removeMapping(Mapping mapping, MappingSource source) {
+        return executor.executeVoidReactor(deleteStatement.bind()
+            .setString(MAPPING_TYPE, mapping.getType().asPrefix())
+            .setString(MAPPING_VALUE, mapping.getMappingValue())
+            .setString(SOURCE, source.asMailAddressString()));
+    }
+
+    Flux<MappingSource> retrieveSources(Mapping mapping) {
+        return executor.executeReactor(retrieveSourcesStatement.bind()
+            .setString(MAPPING_TYPE, mapping.getType().asPrefix())
+            .setString(MAPPING_VALUE, mapping.getMappingValue()))
+            .flatMapMany(Flux::fromIterable)
+            .map(row -> MappingSource.parse(row.getString(SOURCE)));
+    }
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/d4e56936/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRRTModule.java
----------------------------------------------------------------------
diff --git a/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRRTModule.java b/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRRTModule.java
index 87a469b..102c9b6 100644
--- a/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRRTModule.java
+++ b/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRRTModule.java
@@ -23,12 +23,14 @@ import static com.datastax.driver.core.DataType.text;
 
 import org.apache.james.backends.cassandra.components.CassandraModule;
 import org.apache.james.backends.cassandra.utils.CassandraConstants;
+import org.apache.james.rrt.cassandra.tables.CassandraMappingsSourcesTable;
 import org.apache.james.rrt.cassandra.tables.CassandraRecipientRewriteTableTable;
 
 import com.datastax.driver.core.schemabuilder.SchemaBuilder;
 
 public interface CassandraRRTModule {
-    CassandraModule MODULE = CassandraModule.table(CassandraRecipientRewriteTableTable.TABLE_NAME)
+    CassandraModule MODULE = CassandraModule.builder()
+        .table(CassandraRecipientRewriteTableTable.TABLE_NAME)
         .comment("Holds address re-writing rules.")
         .options(options -> options
             .caching(SchemaBuilder.KeyCaching.ALL,
@@ -37,5 +39,14 @@ public interface CassandraRRTModule {
             .addPartitionKey(CassandraRecipientRewriteTableTable.USER, text())
             .addClusteringColumn(CassandraRecipientRewriteTableTable.DOMAIN, text())
             .addClusteringColumn(CassandraRecipientRewriteTableTable.MAPPING, text()))
+        .table(CassandraMappingsSourcesTable.TABLE_NAME)
+        .comment("Projection table for retrieving sources associated with given mappings.")
+        .options(options -> options
+            .caching(SchemaBuilder.KeyCaching.ALL,
+                SchemaBuilder.rows(CassandraConstants.DEFAULT_CACHED_ROW_PER_PARTITION)))
+        .statement(statement -> statement
+            .addPartitionKey(CassandraMappingsSourcesTable.MAPPING_TYPE, text())
+            .addPartitionKey(CassandraMappingsSourcesTable.MAPPING_VALUE, text())
+            .addClusteringColumn(CassandraMappingsSourcesTable.SOURCE, text()))
         .build();
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/d4e56936/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTable.java
----------------------------------------------------------------------
diff --git a/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTable.java b/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTable.java
index 27bc7d4..4c7a92b 100644
--- a/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTable.java
+++ b/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTable.java
@@ -31,40 +31,46 @@ import org.apache.james.rrt.lib.MappingsImpl;
 import org.apache.james.util.OptionalUtils;
 
 public class CassandraRecipientRewriteTable extends AbstractRecipientRewriteTable {
-    private final CassandraRecipientRewriteTableDAO dao;
+    private final CassandraRecipientRewriteTableDAO cassandraRecipientRewriteTableDAO;
+    private final CassandraMappingsSourcesDAO cassandraMappingsSourcesDAO;
 
     @Inject
-    public CassandraRecipientRewriteTable(CassandraRecipientRewriteTableDAO dao) {
-        this.dao = dao;
+    public CassandraRecipientRewriteTable(CassandraRecipientRewriteTableDAO cassandraRecipientRewriteTableDAO, CassandraMappingsSourcesDAO cassandraMappingsSourcesDAO) {
+        this.cassandraRecipientRewriteTableDAO = cassandraRecipientRewriteTableDAO;
+        this.cassandraMappingsSourcesDAO = cassandraMappingsSourcesDAO;
     }
 
     @Override
     public void addMapping(MappingSource source, Mapping mapping) {
-        dao.addMapping(source, mapping).block();
+        cassandraRecipientRewriteTableDAO.addMapping(source, mapping)
+            .then(cassandraMappingsSourcesDAO.addMapping(mapping, source))
+            .block();
     }
 
     @Override
     public void removeMapping(MappingSource source, Mapping mapping) {
-        dao.removeMapping(source, mapping).block();
+        cassandraRecipientRewriteTableDAO.removeMapping(source, mapping)
+            .then(cassandraMappingsSourcesDAO.removeMapping(mapping, source))
+            .block();
     }
 
     @Override
     public Mappings getStoredMappings(MappingSource source) {
-        return dao.retrieveMappings(source)
+        return cassandraRecipientRewriteTableDAO.retrieveMappings(source)
             .blockOptional()
             .orElse(MappingsImpl.empty());
     }
 
     @Override
     public Map<MappingSource, Mappings> getAllMappings() {
-        return dao.getAllMappings().block();
+        return cassandraRecipientRewriteTableDAO.getAllMappings().block();
     }
 
     @Override
     protected Mappings mapAddress(String user, Domain domain) {
         return OptionalUtils.orSuppliers(
-            () -> dao.retrieveMappings(MappingSource.fromUser(user, domain)).blockOptional(),
-            () -> dao.retrieveMappings(MappingSource.fromDomain(domain)).blockOptional())
+            () -> cassandraRecipientRewriteTableDAO.retrieveMappings(MappingSource.fromUser(user, domain)).blockOptional(),
+            () -> cassandraRecipientRewriteTableDAO.retrieveMappings(MappingSource.fromDomain(domain)).blockOptional())
                 .orElse(MappingsImpl.empty());
     }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/d4e56936/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/tables/CassandraMappingsSourcesTable.java
----------------------------------------------------------------------
diff --git a/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/tables/CassandraMappingsSourcesTable.java b/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/tables/CassandraMappingsSourcesTable.java
new file mode 100644
index 0000000..81a3c31
--- /dev/null
+++ b/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/tables/CassandraMappingsSourcesTable.java
@@ -0,0 +1,29 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.rrt.cassandra.tables;
+
+public interface CassandraMappingsSourcesTable {
+
+    String TABLE_NAME = "mappings_sources";
+
+    String MAPPING_TYPE = "mapping_type";
+    String MAPPING_VALUE = "mapping_value";
+    String SOURCE = "source";
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/d4e56936/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraMappingsSourcesDAOTest.java
----------------------------------------------------------------------
diff --git a/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraMappingsSourcesDAOTest.java b/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraMappingsSourcesDAOTest.java
new file mode 100644
index 0000000..5d0a125
--- /dev/null
+++ b/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraMappingsSourcesDAOTest.java
@@ -0,0 +1,80 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.rrt.cassandra;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.apache.james.backends.cassandra.CassandraCluster;
+import org.apache.james.backends.cassandra.CassandraClusterExtension;
+import org.apache.james.backends.cassandra.utils.CassandraUtils;
+import org.apache.james.core.Domain;
+import org.apache.james.rrt.lib.Mapping;
+import org.apache.james.rrt.lib.MappingSource;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+class CassandraMappingsSourcesDAOTest {
+    private static final String USER = "test";
+    private static final String ADDRESS = "test@domain";
+    private static final MappingSource SOURCE = MappingSource.fromUser(USER, Domain.LOCALHOST);
+    private static final Mapping MAPPING = Mapping.alias(ADDRESS);
+
+    @RegisterExtension
+    static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(CassandraRRTModule.MODULE);
+
+    private static CassandraMappingsSourcesDAO dao;
+
+    @BeforeEach
+    void setUp(CassandraCluster cassandra) {
+        dao = new CassandraMappingsSourcesDAO(cassandra.getConf());
+    }
+
+    @Test
+    void retrieveSourcesShouldReturnEmptyByDefault() {
+        assertThat(dao.retrieveSources(MAPPING).collectList().block()).isEmpty();
+    }
+
+    @Test
+    void retrieveSourcesShouldReturnStoredMappingSource() {
+        dao.addMapping(MAPPING, SOURCE).block();
+
+        assertThat(dao.retrieveSources(MAPPING).collectList().block()).containsOnly(SOURCE);
+    }
+
+    @Test
+    void retrieveSourcesShouldNotReturnRemovedMapping() {
+        dao.addMapping(MAPPING, SOURCE).block();
+
+        dao.removeMapping(MAPPING, SOURCE).block();
+
+        assertThat(dao.retrieveSources(MAPPING).collectList().block()).isEmpty();
+    }
+
+    @Test
+    void retrieveSourcesShouldReturnMultipleStoredMappingSourcesForMapping() {
+        MappingSource source2 = MappingSource.fromUser("bob", Domain.LOCALHOST);
+
+        dao.addMapping(MAPPING, SOURCE).block();
+        dao.addMapping(MAPPING, source2).block();
+
+        assertThat(dao.retrieveSources(MAPPING).collectList().block()).containsOnly(SOURCE, source2);
+    }
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/d4e56936/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableDAOTest.java
----------------------------------------------------------------------
diff --git a/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableDAOTest.java b/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableDAOTest.java
index 0d2f096..3abc541 100644
--- a/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableDAOTest.java
+++ b/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableDAOTest.java
@@ -36,65 +36,78 @@ import org.junit.jupiter.api.extension.RegisterExtension;
 class CassandraRecipientRewriteTableDAOTest {
     private static final String USER = "test";
     private static final String ADDRESS = "test@domain";
+    private static final String ADDRESS_2 = "test2@domain";
     private static final MappingSource SOURCE = MappingSource.fromUser(USER, Domain.LOCALHOST);
     private static final Mapping MAPPING = Mapping.alias(ADDRESS);
+    private static final Mapping MAPPING_2 = Mapping.alias(ADDRESS_2);
 
     @RegisterExtension
     static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(CassandraRRTModule.MODULE);
 
-    private CassandraRecipientRewriteTableDAO cassandraRecipientRewriteTableDAO;
+    private CassandraRecipientRewriteTableDAO dao;
 
     @BeforeEach
     void setUp(CassandraCluster cassandra) {
-        cassandraRecipientRewriteTableDAO = new CassandraRecipientRewriteTableDAO(cassandra.getConf(), CassandraUtils.WITH_DEFAULT_CONFIGURATION);
+        dao = new CassandraRecipientRewriteTableDAO(cassandra.getConf(), CassandraUtils.WITH_DEFAULT_CONFIGURATION);
     }
 
     @Test
     void retrieveMappingsShouldReturnEmptyByDefault() {
-        assertThat(cassandraRecipientRewriteTableDAO.retrieveMappings(SOURCE).blockOptional())
-            .isEmpty();
+        assertThat(dao.retrieveMappings(SOURCE).blockOptional()).isEmpty();
     }
 
     @Test
     void getAllMappingsShouldReturnEmptyByDefault() {
-        assertThat(cassandraRecipientRewriteTableDAO.getAllMappings().block())
-            .isEmpty();
+        assertThat(dao.getAllMappings().block()).isEmpty();
     }
 
     @Test
     void retrieveMappingsShouldReturnStoredMapping() {
-        cassandraRecipientRewriteTableDAO.addMapping(SOURCE, MAPPING).block();
+        dao.addMapping(SOURCE, MAPPING).block();
 
-        assertThat(cassandraRecipientRewriteTableDAO.retrieveMappings(SOURCE).blockOptional())
-            .contains(MappingsImpl.fromMappings(MAPPING));
+        assertThat(dao.retrieveMappings(SOURCE).blockOptional()).contains(MappingsImpl.fromMappings(MAPPING));
     }
 
     @Test
     void getAllMappingsShouldReturnStoredMapping() {
-        cassandraRecipientRewriteTableDAO.addMapping(SOURCE, MAPPING).block();
+        dao.addMapping(SOURCE, MAPPING).block();
 
-        assertThat(cassandraRecipientRewriteTableDAO.getAllMappings().block())
-            .contains(Pair.of(SOURCE, MappingsImpl.fromMappings(MAPPING)));
+        assertThat(dao.getAllMappings().block()).contains(Pair.of(SOURCE, MappingsImpl.fromMappings(MAPPING)));
     }
 
     @Test
     void retrieveMappingsShouldNotReturnRemovedMapping() {
-        cassandraRecipientRewriteTableDAO.addMapping(SOURCE, MAPPING).block();
+        dao.addMapping(SOURCE, MAPPING).block();
 
-        cassandraRecipientRewriteTableDAO.removeMapping(SOURCE, MAPPING).block();
+        dao.removeMapping(SOURCE, MAPPING).block();
 
-        assertThat(cassandraRecipientRewriteTableDAO.retrieveMappings(SOURCE).blockOptional())
-            .isEmpty();
+        assertThat(dao.retrieveMappings(SOURCE).blockOptional()).isEmpty();
     }
 
     @Test
     void getAllMappingsShouldNotReturnRemovedMapping() {
-        cassandraRecipientRewriteTableDAO.addMapping(SOURCE, MAPPING).block();
+        dao.addMapping(SOURCE, MAPPING).block();
 
-        cassandraRecipientRewriteTableDAO.removeMapping(SOURCE, MAPPING).block();
+        dao.removeMapping(SOURCE, MAPPING).block();
 
-        assertThat(cassandraRecipientRewriteTableDAO.getAllMappings().block())
-            .isEmpty();
+        assertThat(dao.getAllMappings().block()).isEmpty();
     }
 
+    @Test
+    void retrieveMappingsShouldReturnMultipleStoredMappingsOfSource() {
+        dao.addMapping(SOURCE, MAPPING).block();
+        dao.addMapping(SOURCE, MAPPING_2).block();
+
+        assertThat(dao.retrieveMappings(SOURCE).blockOptional())
+            .contains(MappingsImpl.fromMappings(MAPPING, MAPPING_2));
+    }
+
+    @Test
+    void getAllMappingsShouldReturnMultipleStoredMappings() {
+        dao.addMapping(SOURCE, MAPPING).block();
+        dao.addMapping(SOURCE, MAPPING_2).block();
+
+        assertThat(dao.getAllMappings().block())
+            .contains(Pair.of(SOURCE, MappingsImpl.fromMappings(MAPPING, MAPPING_2)));
+    }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/d4e56936/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableTest.java
----------------------------------------------------------------------
diff --git a/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableTest.java b/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableTest.java
index d0bc05c..f9d27a1 100644
--- a/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableTest.java
+++ b/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableTest.java
@@ -62,7 +62,8 @@ public class CassandraRecipientRewriteTableTest extends AbstractRecipientRewrite
     @Override
     protected AbstractRecipientRewriteTable getRecipientRewriteTable() throws Exception {
         CassandraRecipientRewriteTable rrt = new CassandraRecipientRewriteTable(
-            new CassandraRecipientRewriteTableDAO(cassandra.getConf(), CassandraUtils.WITH_DEFAULT_CONFIGURATION));
+            new CassandraRecipientRewriteTableDAO(cassandra.getConf(), CassandraUtils.WITH_DEFAULT_CONFIGURATION),
+            new CassandraMappingsSourcesDAO(cassandra.getConf()));
         rrt.configure(new DefaultConfigurationBuilder());
         return rrt;
     }

http://git-wip-us.apache.org/repos/asf/james-project/blob/d4e56936/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraStepdefs.java
----------------------------------------------------------------------
diff --git a/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraStepdefs.java b/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraStepdefs.java
index edec0ca..d9aeac4 100644
--- a/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraStepdefs.java
+++ b/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraStepdefs.java
@@ -50,7 +50,8 @@ public class CassandraStepdefs {
 
     private AbstractRecipientRewriteTable getRecipientRewriteTable() throws Exception {
         CassandraRecipientRewriteTable rrt = new CassandraRecipientRewriteTable(
-            new CassandraRecipientRewriteTableDAO(cassandra.getConf(), CassandraUtils.WITH_DEFAULT_CONFIGURATION));
+            new CassandraRecipientRewriteTableDAO(cassandra.getConf(), CassandraUtils.WITH_DEFAULT_CONFIGURATION),
+            new CassandraMappingsSourcesDAO(cassandra.getConf()));
         rrt.configure(new DefaultConfigurationBuilder());
         return rrt;
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[4/5] james-project git commit: MAILBOX-372 make configuration public

Posted by bt...@apache.org.
MAILBOX-372 make configuration public


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/2d48bb03
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/2d48bb03
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/2d48bb03

Branch: refs/heads/master
Commit: 2d48bb0308336a766d24a584469047b2b17e5619
Parents: b22103d
Author: tran tien duc <dt...@linagora.com>
Authored: Wed Jan 16 17:10:09 2019 +0700
Committer: Benoit Tellier <bt...@linagora.com>
Committed: Mon Jan 21 10:04:49 2019 +0700

----------------------------------------------------------------------
 .../org/apache/james/mailbox/events/RetryBackoffConfiguration.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/2d48bb03/mailbox/api/src/main/java/org/apache/james/mailbox/events/RetryBackoffConfiguration.java
----------------------------------------------------------------------
diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/events/RetryBackoffConfiguration.java b/mailbox/api/src/main/java/org/apache/james/mailbox/events/RetryBackoffConfiguration.java
index ef16efb..f802d5e 100644
--- a/mailbox/api/src/main/java/org/apache/james/mailbox/events/RetryBackoffConfiguration.java
+++ b/mailbox/api/src/main/java/org/apache/james/mailbox/events/RetryBackoffConfiguration.java
@@ -25,7 +25,7 @@ import java.util.Objects;
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Preconditions;
 
-class RetryBackoffConfiguration {
+public class RetryBackoffConfiguration {
 
     @FunctionalInterface
     interface RequireMaxRetries {


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[3/5] james-project git commit: MAILBOX-371 Test out of order registrations

Posted by bt...@apache.org.
MAILBOX-371 Test out of order registrations


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/7f2f8fec
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/7f2f8fec
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/7f2f8fec

Branch: refs/heads/master
Commit: 7f2f8fec82118a8a8a31917bf779e2674f2f2cb2
Parents: a760297
Author: Benoit Tellier <bt...@linagora.com>
Authored: Fri Jan 18 12:07:31 2019 +0700
Committer: Benoit Tellier <bt...@linagora.com>
Committed: Mon Jan 21 10:04:49 2019 +0700

----------------------------------------------------------------------
 .../james/mailbox/events/GroupContract.java     | 24 ++++++++++++++++++++
 .../james/mailbox/events/KeyContract.java       | 24 ++++++++++++++++++++
 2 files changed, 48 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/7f2f8fec/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java
----------------------------------------------------------------------
diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java
index 93194e4..ac15639 100644
--- a/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java
+++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java
@@ -73,6 +73,18 @@ public interface GroupContract {
         }
 
         @Test
+        default void registerShouldNotDispatchPastEventsForGroups() throws Exception {
+            MailboxListener listener = newListener();
+
+            eventBus().dispatch(EVENT, NO_KEYS).block();
+
+            eventBus().register(listener, GROUP_A);
+
+            verify(listener, after(FIVE_HUNDRED_MS).never())
+                .event(any());
+        }
+
+        @Test
         default void listenerGroupShouldReceiveEvents() throws Exception {
             MailboxListener listener = newListener();
 
@@ -254,5 +266,17 @@ public interface GroupContract {
             verify(mailboxListener, after(FIVE_HUNDRED_MS).never())
                 .event(any());
         }
+
+        @Test
+        default void registerShouldNotDispatchPastEventsForGroupsInADistributedContext() throws Exception {
+            MailboxListener listener = newListener();
+
+            eventBus().dispatch(EVENT, NO_KEYS).block();
+
+            eventBus2().register(listener, GROUP_A);
+
+            verify(listener, after(FIVE_HUNDRED_MS).never())
+                .event(any());
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/7f2f8fec/mailbox/api/src/test/java/org/apache/james/mailbox/events/KeyContract.java
----------------------------------------------------------------------
diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/KeyContract.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/KeyContract.java
index 6e82af0..317925a 100644
--- a/mailbox/api/src/test/java/org/apache/james/mailbox/events/KeyContract.java
+++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/KeyContract.java
@@ -164,6 +164,18 @@ public interface KeyContract extends EventBusContract {
         }
 
         @Test
+        default void registerShouldNotDispatchPastEvents() throws Exception {
+            MailboxListener listener = newListener();
+
+            eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1)).block();
+
+            eventBus().register(listener, KEY_1);
+
+            verify(listener, after(FIVE_HUNDRED_MS).never())
+                .event(any());
+        }
+
+        @Test
         default void callingAllUnregisterMethodShouldUnregisterTheListener() throws Exception {
             MailboxListener listener = newListener();
             Registration registration = eventBus().register(listener, KEY_1);
@@ -321,5 +333,17 @@ public interface KeyContract extends EventBusContract {
             verify(mailboxListener2, timeout(ONE_SECOND).times(1)).event(any());
         }
 
+        @Test
+        default void registerShouldNotDispatchPastEventsInDistributedContext() throws Exception {
+            MailboxListener listener = newListener();
+
+            eventBus2().dispatch(EVENT, ImmutableSet.of(KEY_1)).block();
+
+            eventBus().register(listener, KEY_1);
+
+            verify(listener, after(FIVE_HUNDRED_MS).never())
+                .event(any());
+        }
+
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org