You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2019/01/15 03:21:40 UTC
[05/30] james-project git commit: MAILBOX-372 InVmEventDelivery
exponential backoff
MAILBOX-372 InVmEventDelivery exponential backoff
- InVmEventDelivery returns error when delivery fails, but always
returns success when dispatching
- Update ErrorHandlingContract to EventBusContract, testing max retries
and exponential backoff delay
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/0707db50
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/0707db50
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/0707db50
Branch: refs/heads/master
Commit: 0707db5008367ca9963b4a939eac94b4b9b21dec
Parents: 48bc40f
Author: tran tien duc <dt...@linagora.com>
Authored: Wed Jan 9 19:10:37 2019 +0700
Committer: Benoit Tellier <bt...@linagora.com>
Committed: Tue Jan 15 09:55:22 2019 +0700
----------------------------------------------------------------------
.../mailbox/events/ErrorHandlingContract.java | 152 +++++++++++++++++++
.../james/mailbox/events/InVMEventBus.java | 3 +-
.../events/delivery/InVmEventDelivery.java | 33 +++-
.../james/mailbox/events/InVMEventBusTest.java | 4 +-
.../events/delivery/InVmEventDeliveryTest.java | 33 ++--
5 files changed, 203 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/james-project/blob/0707db50/mailbox/api/src/test/java/org/apache/james/mailbox/events/ErrorHandlingContract.java
----------------------------------------------------------------------
diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/ErrorHandlingContract.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/ErrorHandlingContract.java
new file mode 100644
index 0000000..4b17a6f
--- /dev/null
+++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/ErrorHandlingContract.java
@@ -0,0 +1,152 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.mailbox.events;
+
+import static org.apache.james.mailbox.events.EventBusTestFixture.EVENT;
+import static org.apache.james.mailbox.events.EventBusTestFixture.NO_KEYS;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
+
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.james.mailbox.Event;
+import org.apache.james.mailbox.MailboxListener;
+import org.apache.james.mailbox.util.EventCollector;
+import org.assertj.core.api.SoftAssertions;
+import org.junit.jupiter.api.Test;
+
+import com.google.common.base.Stopwatch;
+
+interface ErrorHandlingContract extends EventBusContract {
+
+ class ThrowingListener implements MailboxListener {
+
+ private List<Instant> timeElapsed;
+
+ private ThrowingListener() {
+ timeElapsed = new ArrayList<>();
+ }
+
+ @Override
+ public ListenerType getType() {
+ return MailboxListener.ListenerType.ONCE;
+ }
+
+ @Override
+ public void event(Event event) {
+ timeElapsed.add(Instant.now());
+ throw new RuntimeException("throw to trigger reactor retry");
+ }
+ }
+
+ default EventCollector eventCollector() {
+ return spy(new EventCollector());
+ }
+
+ default ThrowingListener throwingListener() {
+ return new ThrowingListener();
+ }
+
+ default long recordTimeRun(Runnable operation) {
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ operation.run();
+ return stopwatch.elapsed(TimeUnit.MILLISECONDS);
+ }
+
+ @Test
+ default void listenerShouldReceiveWhenFailsLessThanMaxRetries() {
+ EventCollector eventCollector = eventCollector();
+
+ doThrow(new RuntimeException())
+ .doThrow(new RuntimeException())
+ .doCallRealMethod()
+ .when(eventCollector).event(EVENT);
+
+ eventBus().register(eventCollector, new EventBusTestFixture.GroupA());
+ eventBus().dispatch(EVENT, NO_KEYS).block();
+
+ assertThat(eventCollector.getEvents())
+ .hasSize(1);
+ }
+
+ @Test
+ default void listenerShouldReceiveWhenFailsEqualsMaxRetries() {
+ EventCollector eventCollector = eventCollector();
+
+ doThrow(new RuntimeException())
+ .doThrow(new RuntimeException())
+ .doThrow(new RuntimeException())
+ .doCallRealMethod()
+ .when(eventCollector).event(EVENT);
+
+ eventBus().register(eventCollector, new EventBusTestFixture.GroupA());
+ eventBus().dispatch(EVENT, NO_KEYS).block();
+
+ assertThat(eventCollector.getEvents())
+ .hasSize(1);
+ }
+
+ @Test
+ default void listenerShouldNotReceiveWhenFailsGreaterThanMaxRetries() {
+ EventCollector eventCollector = eventCollector();
+
+ doThrow(new RuntimeException())
+ .doThrow(new RuntimeException())
+ .doThrow(new RuntimeException())
+ .doThrow(new RuntimeException())
+ .doCallRealMethod()
+ .when(eventCollector).event(EVENT);
+
+ eventBus().register(eventCollector, new EventBusTestFixture.GroupA());
+ eventBus().dispatch(EVENT, NO_KEYS).block();
+
+ assertThat(eventCollector.getEvents())
+ .isEmpty();
+ }
+
+ @Test
+ default void retriesBackOffShouldDelayByExponentialGrowth() {
+ ThrowingListener throwingListener = throwingListener();
+
+ eventBus().register(throwingListener, new EventBusTestFixture.GroupA());
+ eventBus().dispatch(EVENT, NO_KEYS).block();
+
+ SoftAssertions.assertSoftly(softly -> {
+ softly.assertThat(throwingListener.timeElapsed).hasSize(4);
+
+ long minFirstDelayAfter = 100; // first backOff
+ long minSecondDelayAfter = 100; // 200 * jitter factor (200 * 0.5)
+ long minThirdDelayAfter = 200; // 400 * jitter factor (400 * 0.5)
+
+ softly.assertThat(throwingListener.timeElapsed.get(1))
+ .isAfterOrEqualTo(throwingListener.timeElapsed.get(0).plusMillis(minFirstDelayAfter));
+
+ softly.assertThat(throwingListener.timeElapsed.get(2))
+ .isAfterOrEqualTo(throwingListener.timeElapsed.get(1).plusMillis(minSecondDelayAfter));
+
+ softly.assertThat(throwingListener.timeElapsed.get(3))
+ .isAfterOrEqualTo(throwingListener.timeElapsed.get(2).plusMillis(minThirdDelayAfter));
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/james-project/blob/0707db50/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/InVMEventBus.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/InVMEventBus.java b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/InVMEventBus.java
index 0f452bf..aad1be0 100644
--- a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/InVMEventBus.java
+++ b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/InVMEventBus.java
@@ -63,7 +63,8 @@ public class InVMEventBus implements EventBus {
@Override
public Mono<Void> dispatch(Event event, Set<RegistrationKey> keys) {
if (!event.isNoop()) {
- return eventDelivery.deliver(registeredListeners(keys), event).synchronousListenerFuture();
+ return eventDelivery.deliver(registeredListeners(keys), event).synchronousListenerFuture()
+ .onErrorResume(throwable -> Mono.empty());
}
return Mono.empty();
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/0707db50/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/InVmEventDelivery.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/InVmEventDelivery.java b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/InVmEventDelivery.java
index 2d2ef65..63edafb 100644
--- a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/InVmEventDelivery.java
+++ b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/InVmEventDelivery.java
@@ -19,6 +19,7 @@
package org.apache.james.mailbox.events.delivery;
+import java.time.Duration;
import java.util.Collection;
import javax.inject.Inject;
@@ -40,6 +41,10 @@ import reactor.core.scheduler.Schedulers;
public class InVmEventDelivery implements EventDelivery {
private static final Logger LOGGER = LoggerFactory.getLogger(InVmEventDelivery.class);
+ private static final int MAX_RETRIES = 3;
+ private static final Duration FIRST_BACKOFF = Duration.ofMillis(100);
+ private static final Duration MAX_BACKOFF = Duration.ofMillis(Long.MAX_VALUE);
+ private static final double DEFAULT_JITTER_FACTOR = 0.5;
private final MetricFactory metricFactory;
@@ -72,22 +77,40 @@ public class InVmEventDelivery implements EventDelivery {
private Mono<Void> doDeliver(Collection<MailboxListener> mailboxListeners, Event event) {
return Flux.fromIterable(mailboxListeners)
- .flatMap(mailboxListener -> Mono.fromRunnable(() -> doDeliverToListener(mailboxListener, event)))
+ .flatMap(mailboxListener -> deliveryWithRetries(event, mailboxListener))
.then()
.subscribeOn(Schedulers.elastic());
}
+ private Mono<Void> deliveryWithRetries(Event event, MailboxListener mailboxListener) {
+ return Mono.fromRunnable(() -> doDeliverToListener(mailboxListener, event))
+ .doOnError(throwable -> LOGGER.error("Error while processing listener {} for {}",
+ listenerName(mailboxListener),
+ eventName(event),
+ throwable))
+ .retryBackoff(MAX_RETRIES, FIRST_BACKOFF, MAX_BACKOFF, DEFAULT_JITTER_FACTOR)
+ .doOnError(throwable -> LOGGER.error("listener {} exceeded maximum retry({}) to handle event {}",
+ listenerName(mailboxListener),
+ MAX_RETRIES,
+ eventName(event),
+ throwable))
+ .then();
+ }
+
private void doDeliverToListener(MailboxListener mailboxListener, Event event) {
TimeMetric timer = metricFactory.timer("mailbox-listener-" + mailboxListener.getClass().getSimpleName());
try {
mailboxListener.event(event);
- } catch (Throwable throwable) {
- LOGGER.error("Error while processing listener {} for {}",
- mailboxListener.getClass().getCanonicalName(), event.getClass().getCanonicalName(),
- throwable);
} finally {
timer.stopAndPublish();
}
}
+ private String listenerName(MailboxListener mailboxListener) {
+ return mailboxListener.getClass().getCanonicalName();
+ }
+
+ private String eventName(Event event) {
+ return event.getClass().getCanonicalName();
+ }
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/0707db50/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/InVMEventBusTest.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/InVMEventBusTest.java b/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/InVMEventBusTest.java
index 0241ac7..8df4557 100644
--- a/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/InVMEventBusTest.java
+++ b/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/InVMEventBusTest.java
@@ -23,7 +23,9 @@ import org.apache.james.mailbox.events.delivery.InVmEventDelivery;
import org.apache.james.metrics.api.NoopMetricFactory;
import org.junit.jupiter.api.BeforeEach;
-public class InVMEventBusTest implements KeyContract.SingleEventBusKeyContract, GroupContract.SingleEventBusGroupContract {
+public class InVMEventBusTest implements KeyContract.SingleEventBusKeyContract, GroupContract.SingleEventBusGroupContract,
+ ErrorHandlingContract{
+
private InVMEventBus eventBus;
@BeforeEach
http://git-wip-us.apache.org/repos/asf/james-project/blob/0707db50/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/InVmEventDeliveryTest.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/InVmEventDeliveryTest.java b/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/InVmEventDeliveryTest.java
index 3f46019..77194fb 100644
--- a/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/InVmEventDeliveryTest.java
+++ b/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/InVmEventDeliveryTest.java
@@ -20,7 +20,7 @@
package org.apache.james.mailbox.events.delivery;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatCode;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.jupiter.api.Assertions.assertTimeout;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
@@ -85,20 +85,21 @@ class InVmEventDeliveryTest {
void deliverShouldNotDeliverEventToListenerWhenException() {
doThrow(RuntimeException.class).when(syncEventCollector).event(event);
- inVmEventDelivery.deliver(ImmutableList.of(syncEventCollector), event).allListenerFuture().block();
+ inVmEventDelivery.deliver(ImmutableList.of(syncEventCollector), event).allListenerFuture().subscribe();
assertThat(syncEventCollector.getEvents())
.isEmpty();
}
@Test
- void deliverShouldBeSuccessWhenException() {
+ void deliverShouldBeErrorWhenException() {
doThrow(RuntimeException.class).when(syncEventCollector).event(event);
- assertThatCode(() -> inVmEventDelivery
+ assertThatThrownBy(() -> inVmEventDelivery
.deliver(ImmutableList.of(syncEventCollector), event).allListenerFuture()
.block())
- .doesNotThrowAnyException();
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessageContaining("Retries exhausted");
}
}
@@ -109,30 +110,31 @@ class InVmEventDeliveryTest {
doThrow(RuntimeException.class).when(asyncEventCollector).event(event);
- inVmEventDelivery.deliver(ImmutableList.of(asyncEventCollector), event).allListenerFuture().block();
+ inVmEventDelivery.deliver(ImmutableList.of(asyncEventCollector), event).allListenerFuture().subscribe();
assertThat(asyncEventCollector.getEvents())
.isEmpty();
}
@Test
- void deliverShouldBeSuccessWhenException() {
+ void deliverShouldBeErrorWhenException() {
doThrow(RuntimeException.class).when(asyncEventCollector).event(event);
- assertThatCode(() -> inVmEventDelivery
+ assertThatThrownBy(() -> inVmEventDelivery
.deliver(ImmutableList.of(asyncEventCollector), event).allListenerFuture()
.block())
- .doesNotThrowAnyException();
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessageContaining("Retries exhausted");
}
}
@Nested
- class BothAsynchronousAnsSynchronous {
+ class BothAsynchronousAndSynchronous {
@Test
void deliverShouldDeliverEventToSyncListenerWhenAsyncGetException() {
doThrow(RuntimeException.class).when(asyncEventCollector).event(event);
- inVmEventDelivery.deliver(ImmutableList.of(asyncEventCollector, syncEventCollector), event).allListenerFuture().block();
+ inVmEventDelivery.deliver(ImmutableList.of(asyncEventCollector, syncEventCollector), event).allListenerFuture().subscribe();
SoftAssertions.assertSoftly(softly -> {
softly.assertThat(asyncEventCollector.getEvents()).isEmpty();
@@ -145,7 +147,7 @@ class InVmEventDeliveryTest {
void deliverShouldDeliverEventToAsyncListenerWhenSyncGetException() {
doThrow(RuntimeException.class).when(syncEventCollector).event(event);
- inVmEventDelivery.deliver(ImmutableList.of(asyncEventCollector, syncEventCollector), event).allListenerFuture().block();
+ inVmEventDelivery.deliver(ImmutableList.of(asyncEventCollector, syncEventCollector), event).allListenerFuture().subscribe();
SoftAssertions.assertSoftly(softly -> {
softly.assertThat(syncEventCollector.getEvents()).isEmpty();
@@ -155,14 +157,15 @@ class InVmEventDeliveryTest {
}
@Test
- void deliverShouldBeSuccessWhenException() {
+ void deliverShouldBeErrorWhenException() {
doThrow(RuntimeException.class).when(syncEventCollector).event(event);
doThrow(RuntimeException.class).when(asyncEventCollector).event(event);
- assertThatCode(() -> inVmEventDelivery
+ assertThatThrownBy(() -> inVmEventDelivery
.deliver(ImmutableList.of(asyncEventCollector), event).allListenerFuture()
.block())
- .doesNotThrowAnyException();
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessageContaining("Retries exhausted");
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org