You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2019/01/15 03:21:40 UTC

[05/30] james-project git commit: MAILBOX-372 InVmEventDelivery exponential backoff

MAILBOX-372 InVmEventDelivery exponential backoff

- InVmEventDelivery returns error when delivery fails, but always
returns success when dispatching
- Update ErrorHandlingContract to EventBusContract, testing max retries
and exponential backoff delay


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/0707db50
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/0707db50
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/0707db50

Branch: refs/heads/master
Commit: 0707db5008367ca9963b4a939eac94b4b9b21dec
Parents: 48bc40f
Author: tran tien duc <dt...@linagora.com>
Authored: Wed Jan 9 19:10:37 2019 +0700
Committer: Benoit Tellier <bt...@linagora.com>
Committed: Tue Jan 15 09:55:22 2019 +0700

----------------------------------------------------------------------
 .../mailbox/events/ErrorHandlingContract.java   | 152 +++++++++++++++++++
 .../james/mailbox/events/InVMEventBus.java      |   3 +-
 .../events/delivery/InVmEventDelivery.java      |  33 +++-
 .../james/mailbox/events/InVMEventBusTest.java  |   4 +-
 .../events/delivery/InVmEventDeliveryTest.java  |  33 ++--
 5 files changed, 203 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/0707db50/mailbox/api/src/test/java/org/apache/james/mailbox/events/ErrorHandlingContract.java
----------------------------------------------------------------------
diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/ErrorHandlingContract.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/ErrorHandlingContract.java
new file mode 100644
index 0000000..4b17a6f
--- /dev/null
+++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/ErrorHandlingContract.java
@@ -0,0 +1,152 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.events;
+
+import static org.apache.james.mailbox.events.EventBusTestFixture.EVENT;
+import static org.apache.james.mailbox.events.EventBusTestFixture.NO_KEYS;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
+
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.james.mailbox.Event;
+import org.apache.james.mailbox.MailboxListener;
+import org.apache.james.mailbox.util.EventCollector;
+import org.assertj.core.api.SoftAssertions;
+import org.junit.jupiter.api.Test;
+
+import com.google.common.base.Stopwatch;
+
+interface ErrorHandlingContract extends EventBusContract {
+
+    class ThrowingListener implements MailboxListener {
+
+        private List<Instant> timeElapsed;
+
+        private ThrowingListener() {
+            timeElapsed = new ArrayList<>();
+        }
+
+        @Override
+        public ListenerType getType() {
+            return MailboxListener.ListenerType.ONCE;
+        }
+
+        @Override
+        public void event(Event event) {
+            timeElapsed.add(Instant.now());
+            throw new RuntimeException("throw to trigger reactor retry");
+        }
+    }
+
+    default EventCollector eventCollector() {
+        return spy(new EventCollector());
+    }
+
+    default ThrowingListener throwingListener() {
+        return new ThrowingListener();
+    }
+
+    default long recordTimeRun(Runnable operation) {
+        Stopwatch stopwatch = Stopwatch.createStarted();
+        operation.run();
+        return stopwatch.elapsed(TimeUnit.MILLISECONDS);
+    }
+
+    @Test
+    default void listenerShouldReceiveWhenFailsLessThanMaxRetries() {
+        EventCollector eventCollector = eventCollector();
+
+        doThrow(new RuntimeException())
+            .doThrow(new RuntimeException())
+            .doCallRealMethod()
+            .when(eventCollector).event(EVENT);
+
+        eventBus().register(eventCollector, new EventBusTestFixture.GroupA());
+        eventBus().dispatch(EVENT, NO_KEYS).block();
+
+        assertThat(eventCollector.getEvents())
+            .hasSize(1);
+    }
+
+    @Test
+    default void listenerShouldReceiveWhenFailsEqualsMaxRetries() {
+        EventCollector eventCollector = eventCollector();
+
+        doThrow(new RuntimeException())
+            .doThrow(new RuntimeException())
+            .doThrow(new RuntimeException())
+            .doCallRealMethod()
+            .when(eventCollector).event(EVENT);
+
+        eventBus().register(eventCollector, new EventBusTestFixture.GroupA());
+        eventBus().dispatch(EVENT, NO_KEYS).block();
+
+        assertThat(eventCollector.getEvents())
+            .hasSize(1);
+    }
+
+    @Test
+    default void listenerShouldNotReceiveWhenFailsGreaterThanMaxRetries() {
+        EventCollector eventCollector = eventCollector();
+
+        doThrow(new RuntimeException())
+            .doThrow(new RuntimeException())
+            .doThrow(new RuntimeException())
+            .doThrow(new RuntimeException())
+            .doCallRealMethod()
+            .when(eventCollector).event(EVENT);
+
+        eventBus().register(eventCollector, new EventBusTestFixture.GroupA());
+        eventBus().dispatch(EVENT, NO_KEYS).block();
+
+        assertThat(eventCollector.getEvents())
+            .isEmpty();
+    }
+
+    @Test
+    default void retriesBackOffShouldDelayByExponentialGrowth() {
+        ThrowingListener throwingListener = throwingListener();
+
+        eventBus().register(throwingListener, new EventBusTestFixture.GroupA());
+        eventBus().dispatch(EVENT, NO_KEYS).block();
+
+        SoftAssertions.assertSoftly(softly -> {
+            softly.assertThat(throwingListener.timeElapsed).hasSize(4);
+
+            long minFirstDelayAfter = 100; // first backOff
+            long minSecondDelayAfter = 100; // 200 * jitter factor (200 * 0.5)
+            long minThirdDelayAfter = 200; // 400 * jitter factor (400 * 0.5)
+
+            softly.assertThat(throwingListener.timeElapsed.get(1))
+                .isAfterOrEqualTo(throwingListener.timeElapsed.get(0).plusMillis(minFirstDelayAfter));
+
+            softly.assertThat(throwingListener.timeElapsed.get(2))
+                .isAfterOrEqualTo(throwingListener.timeElapsed.get(1).plusMillis(minSecondDelayAfter));
+
+            softly.assertThat(throwingListener.timeElapsed.get(3))
+                .isAfterOrEqualTo(throwingListener.timeElapsed.get(2).plusMillis(minThirdDelayAfter));
+        });
+    }
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/0707db50/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/InVMEventBus.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/InVMEventBus.java b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/InVMEventBus.java
index 0f452bf..aad1be0 100644
--- a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/InVMEventBus.java
+++ b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/InVMEventBus.java
@@ -63,7 +63,8 @@ public class InVMEventBus implements EventBus {
     @Override
     public Mono<Void> dispatch(Event event, Set<RegistrationKey> keys) {
         if (!event.isNoop()) {
-            return eventDelivery.deliver(registeredListeners(keys), event).synchronousListenerFuture();
+            return eventDelivery.deliver(registeredListeners(keys), event).synchronousListenerFuture()
+                .onErrorResume(throwable -> Mono.empty());
         }
         return Mono.empty();
     }

http://git-wip-us.apache.org/repos/asf/james-project/blob/0707db50/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/InVmEventDelivery.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/InVmEventDelivery.java b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/InVmEventDelivery.java
index 2d2ef65..63edafb 100644
--- a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/InVmEventDelivery.java
+++ b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/InVmEventDelivery.java
@@ -19,6 +19,7 @@
 
 package org.apache.james.mailbox.events.delivery;
 
+import java.time.Duration;
 import java.util.Collection;
 
 import javax.inject.Inject;
@@ -40,6 +41,10 @@ import reactor.core.scheduler.Schedulers;
 
 public class InVmEventDelivery implements EventDelivery {
     private static final Logger LOGGER = LoggerFactory.getLogger(InVmEventDelivery.class);
+    private static final int MAX_RETRIES = 3;
+    private static final Duration FIRST_BACKOFF = Duration.ofMillis(100);
+    private static final Duration MAX_BACKOFF = Duration.ofMillis(Long.MAX_VALUE);
+    private static final double DEFAULT_JITTER_FACTOR = 0.5;
 
     private final MetricFactory metricFactory;
 
@@ -72,22 +77,40 @@ public class InVmEventDelivery implements EventDelivery {
 
     private Mono<Void> doDeliver(Collection<MailboxListener> mailboxListeners, Event event) {
         return Flux.fromIterable(mailboxListeners)
-            .flatMap(mailboxListener -> Mono.fromRunnable(() -> doDeliverToListener(mailboxListener, event)))
+            .flatMap(mailboxListener -> deliveryWithRetries(event, mailboxListener))
             .then()
             .subscribeOn(Schedulers.elastic());
     }
 
+    private Mono<Void> deliveryWithRetries(Event event, MailboxListener mailboxListener) {
+        return Mono.fromRunnable(() -> doDeliverToListener(mailboxListener, event))
+            .doOnError(throwable -> LOGGER.error("Error while processing listener {} for {}",
+                listenerName(mailboxListener),
+                eventName(event),
+                throwable))
+            .retryBackoff(MAX_RETRIES, FIRST_BACKOFF, MAX_BACKOFF, DEFAULT_JITTER_FACTOR)
+            .doOnError(throwable -> LOGGER.error("listener {} exceeded maximum retry({}) to handle event {}",
+                listenerName(mailboxListener),
+                MAX_RETRIES,
+                eventName(event),
+                throwable))
+            .then();
+    }
+
     private void doDeliverToListener(MailboxListener mailboxListener, Event event) {
         TimeMetric timer = metricFactory.timer("mailbox-listener-" + mailboxListener.getClass().getSimpleName());
         try {
             mailboxListener.event(event);
-        } catch (Throwable throwable) {
-            LOGGER.error("Error while processing listener {} for {}",
-                mailboxListener.getClass().getCanonicalName(), event.getClass().getCanonicalName(),
-                throwable);
         } finally {
             timer.stopAndPublish();
         }
     }
 
+    private String listenerName(MailboxListener mailboxListener) {
+        return mailboxListener.getClass().getCanonicalName();
+    }
+
+    private String eventName(Event event) {
+        return event.getClass().getCanonicalName();
+    }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/0707db50/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/InVMEventBusTest.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/InVMEventBusTest.java b/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/InVMEventBusTest.java
index 0241ac7..8df4557 100644
--- a/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/InVMEventBusTest.java
+++ b/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/InVMEventBusTest.java
@@ -23,7 +23,9 @@ import org.apache.james.mailbox.events.delivery.InVmEventDelivery;
 import org.apache.james.metrics.api.NoopMetricFactory;
 import org.junit.jupiter.api.BeforeEach;
 
-public class InVMEventBusTest implements KeyContract.SingleEventBusKeyContract, GroupContract.SingleEventBusGroupContract {
+public class InVMEventBusTest implements KeyContract.SingleEventBusKeyContract, GroupContract.SingleEventBusGroupContract,
+    ErrorHandlingContract{
+
     private InVMEventBus eventBus;
 
     @BeforeEach

http://git-wip-us.apache.org/repos/asf/james-project/blob/0707db50/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/InVmEventDeliveryTest.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/InVmEventDeliveryTest.java b/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/InVmEventDeliveryTest.java
index 3f46019..77194fb 100644
--- a/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/InVmEventDeliveryTest.java
+++ b/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/InVmEventDeliveryTest.java
@@ -20,7 +20,7 @@
 package org.apache.james.mailbox.events.delivery;
 
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatCode;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.junit.jupiter.api.Assertions.assertTimeout;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doThrow;
@@ -85,20 +85,21 @@ class InVmEventDeliveryTest {
             void deliverShouldNotDeliverEventToListenerWhenException() {
                 doThrow(RuntimeException.class).when(syncEventCollector).event(event);
 
-                inVmEventDelivery.deliver(ImmutableList.of(syncEventCollector), event).allListenerFuture().block();
+                inVmEventDelivery.deliver(ImmutableList.of(syncEventCollector), event).allListenerFuture().subscribe();
 
                 assertThat(syncEventCollector.getEvents())
                     .isEmpty();
             }
 
             @Test
-            void deliverShouldBeSuccessWhenException() {
+            void deliverShouldBeErrorWhenException() {
                 doThrow(RuntimeException.class).when(syncEventCollector).event(event);
 
-                assertThatCode(() -> inVmEventDelivery
+                assertThatThrownBy(() -> inVmEventDelivery
                     .deliver(ImmutableList.of(syncEventCollector), event).allListenerFuture()
                     .block())
-                .doesNotThrowAnyException();
+                .isInstanceOf(IllegalStateException.class)
+                .hasMessageContaining("Retries exhausted");
             }
         }
 
@@ -109,30 +110,31 @@ class InVmEventDeliveryTest {
 
                 doThrow(RuntimeException.class).when(asyncEventCollector).event(event);
 
-                inVmEventDelivery.deliver(ImmutableList.of(asyncEventCollector), event).allListenerFuture().block();
+                inVmEventDelivery.deliver(ImmutableList.of(asyncEventCollector), event).allListenerFuture().subscribe();
 
                 assertThat(asyncEventCollector.getEvents())
                     .isEmpty();
             }
 
             @Test
-            void deliverShouldBeSuccessWhenException() {
+            void deliverShouldBeErrorWhenException() {
                 doThrow(RuntimeException.class).when(asyncEventCollector).event(event);
 
-                assertThatCode(() -> inVmEventDelivery
+                assertThatThrownBy(() -> inVmEventDelivery
                     .deliver(ImmutableList.of(asyncEventCollector), event).allListenerFuture()
                     .block())
-                .doesNotThrowAnyException();
+                .isInstanceOf(IllegalStateException.class)
+                .hasMessageContaining("Retries exhausted");
             }
         }
 
         @Nested
-        class BothAsynchronousAnsSynchronous {
+        class BothAsynchronousAndSynchronous {
             @Test
             void deliverShouldDeliverEventToSyncListenerWhenAsyncGetException() {
                 doThrow(RuntimeException.class).when(asyncEventCollector).event(event);
 
-                inVmEventDelivery.deliver(ImmutableList.of(asyncEventCollector, syncEventCollector), event).allListenerFuture().block();
+                inVmEventDelivery.deliver(ImmutableList.of(asyncEventCollector, syncEventCollector), event).allListenerFuture().subscribe();
 
                 SoftAssertions.assertSoftly(softly -> {
                     softly.assertThat(asyncEventCollector.getEvents()).isEmpty();
@@ -145,7 +147,7 @@ class InVmEventDeliveryTest {
             void deliverShouldDeliverEventToAsyncListenerWhenSyncGetException() {
                 doThrow(RuntimeException.class).when(syncEventCollector).event(event);
 
-                inVmEventDelivery.deliver(ImmutableList.of(asyncEventCollector, syncEventCollector), event).allListenerFuture().block();
+                inVmEventDelivery.deliver(ImmutableList.of(asyncEventCollector, syncEventCollector), event).allListenerFuture().subscribe();
 
                 SoftAssertions.assertSoftly(softly -> {
                     softly.assertThat(syncEventCollector.getEvents()).isEmpty();
@@ -155,14 +157,15 @@ class InVmEventDeliveryTest {
             }
 
             @Test
-            void deliverShouldBeSuccessWhenException() {
+            void deliverShouldBeErrorWhenException() {
                 doThrow(RuntimeException.class).when(syncEventCollector).event(event);
                 doThrow(RuntimeException.class).when(asyncEventCollector).event(event);
 
-                assertThatCode(() -> inVmEventDelivery
+                assertThatThrownBy(() -> inVmEventDelivery
                     .deliver(ImmutableList.of(asyncEventCollector), event).allListenerFuture()
                     .block())
-                .doesNotThrowAnyException();
+                .isInstanceOf(IllegalStateException.class)
+                .hasMessageContaining("Retries exhausted");
             }
         }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org