You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2019/01/16 11:11:51 UTC

[3/3] james-project git commit: MAILBOX-372 handle retrying for event group consumers

MAILBOX-372 handle retrying for event group consumers


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/1f390f4f
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/1f390f4f
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/1f390f4f

Branch: refs/heads/master
Commit: 1f390f4f3f69db1c073b5e018b25cdf52acb3647
Parents: bd656cf
Author: tran tien duc <dt...@linagora.com>
Authored: Tue Jan 15 14:58:45 2019 +0700
Committer: tran tien duc <dt...@linagora.com>
Committed: Wed Jan 16 14:21:28 2019 +0700

----------------------------------------------------------------------
 .../james/mailbox/events/RetryBackoff.java      | 110 ---------------
 .../events/RetryBackoffConfiguration.java       | 125 +++++++++++++++++
 .../mailbox/events/ErrorHandlingContract.java   |  45 ++++--
 .../mailbox/events/EventBusTestFixture.java     |   4 +-
 .../events/RetryBackoffConfigurationTest.java   | 140 +++++++++++++++++++
 .../mailbox/events/GroupConsumerRetry.java      | 133 ++++++++++++++++++
 .../james/mailbox/events/GroupRegistration.java |  62 +++++---
 .../events/GroupRegistrationHandler.java        |   5 +-
 .../james/mailbox/events/RabbitMQEventBus.java  |   8 +-
 .../mailbox/events/WaitDelayGenerator.java      |  18 ++-
 .../mailbox/events/RabbitMQEventBusTest.java    |  26 +++-
 .../mailbox/events/WaitDelayGeneratorTest.java  |  49 ++++++-
 12 files changed, 565 insertions(+), 160 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/1f390f4f/mailbox/api/src/main/java/org/apache/james/mailbox/events/RetryBackoff.java
----------------------------------------------------------------------
diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/events/RetryBackoff.java b/mailbox/api/src/main/java/org/apache/james/mailbox/events/RetryBackoff.java
deleted file mode 100644
index 2fa15ad..0000000
--- a/mailbox/api/src/main/java/org/apache/james/mailbox/events/RetryBackoff.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one   *
- * or more contributor license agreements.  See the NOTICE file *
- * distributed with this work for additional information        *
- * regarding copyright ownership.  The ASF licenses this file   *
- * to you under the Apache License, Version 2.0 (the            *
- * "License"); you may not use this file except in compliance   *
- * with the License.  You may obtain a copy of the License at   *
- *                                                              *
- *   http://www.apache.org/licenses/LICENSE-2.0                 *
- *                                                              *
- * Unless required by applicable law or agreed to in writing,   *
- * software distributed under the License is distributed on an  *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
- * KIND, either express or implied.  See the License for the    *
- * specific language governing permissions and limitations      *
- * under the License.                                           *
- ****************************************************************/
-
-package org.apache.james.mailbox.events;
-
-import java.time.Duration;
-
-import com.google.common.base.MoreObjects;
-import com.google.common.base.Preconditions;
-
-class RetryBackoff {
-
-    @FunctionalInterface
-    interface RequireMaxRetries {
-        RequireFirstBackoff maxRetries(int maxRetries);
-    }
-
-    @FunctionalInterface
-    interface RequireFirstBackoff {
-        RequireJitterFactor firstBackoff(Duration firstBackoff);
-    }
-
-    @FunctionalInterface
-    interface RequireJitterFactor {
-        ReadyToBuild jitterFactor(double jitterFactor);
-    }
-
-    static class ReadyToBuild {
-        private final int maxRetries;
-        private final Duration firstBackoff;
-        private final double jitterFactor;
-
-        private ReadyToBuild(int maxRetries, Duration firstBackoff, double jitterFactor) {
-            this.maxRetries = maxRetries;
-            this.firstBackoff = firstBackoff;
-            this.jitterFactor = jitterFactor;
-        }
-
-        RetryBackoff build() {
-            return new RetryBackoff(maxRetries, firstBackoff, jitterFactor);
-        }
-    }
-
-    static RequireMaxRetries builder() {
-        return maxRetries -> firstBackoff -> jitterFactor -> new ReadyToBuild(maxRetries, firstBackoff, jitterFactor);
-    }
-
-    static RetryBackoff defaultRetryBackoff() {
-        return builder()
-            .maxRetries(DEFAULT_MAX_RETRIES)
-            .firstBackoff(DEFAULT_FIRST_BACKOFF)
-            .jitterFactor(DEFAULT_JITTER_FACTOR)
-            .build();
-    }
-
-    private static final double DEFAULT_JITTER_FACTOR = 0.5;
-    private static final int DEFAULT_MAX_RETRIES = 3;
-    private static final Duration DEFAULT_FIRST_BACKOFF = Duration.ofMillis(100);
-
-    private final int maxRetries;
-    private final Duration firstBackoff;
-    private final double jitterFactor;
-
-    RetryBackoff(int maxRetries, Duration firstBackoff, double jitterFactor) {
-        Preconditions.checkArgument(!firstBackoff.isNegative() && !firstBackoff.isZero(), "firstBackoff has to be strictly positive");
-        Preconditions.checkArgument(maxRetries > 0, "maxRetries has to be strictly positive");
-        Preconditions.checkArgument(jitterFactor > 0, "jitterFactor has to be strictly positive");
-
-        this.maxRetries = maxRetries;
-        this.firstBackoff = firstBackoff;
-        this.jitterFactor = jitterFactor;
-    }
-
-    public int getMaxRetries() {
-        return maxRetries;
-    }
-
-    public Duration getFirstBackoff() {
-        return firstBackoff;
-    }
-
-    public double getJitterFactor() {
-        return jitterFactor;
-    }
-
-    @Override
-    public String toString() {
-        return MoreObjects.toStringHelper(this)
-            .add("maxRetries", maxRetries)
-            .add("firstBackoff", firstBackoff)
-            .add("jitterFactor", jitterFactor)
-            .toString();
-    }
-}

http://git-wip-us.apache.org/repos/asf/james-project/blob/1f390f4f/mailbox/api/src/main/java/org/apache/james/mailbox/events/RetryBackoffConfiguration.java
----------------------------------------------------------------------
diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/events/RetryBackoffConfiguration.java b/mailbox/api/src/main/java/org/apache/james/mailbox/events/RetryBackoffConfiguration.java
new file mode 100644
index 0000000..ef16efb
--- /dev/null
+++ b/mailbox/api/src/main/java/org/apache/james/mailbox/events/RetryBackoffConfiguration.java
@@ -0,0 +1,125 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.events;
+
+import java.time.Duration;
+import java.util.Objects;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Preconditions;
+
+class RetryBackoffConfiguration {
+
+    @FunctionalInterface
+    interface RequireMaxRetries {
+        RequireFirstBackoff maxRetries(int maxRetries);
+    }
+
+    @FunctionalInterface
+    interface RequireFirstBackoff {
+        RequireJitterFactor firstBackoff(Duration firstBackoff);
+    }
+
+    @FunctionalInterface
+    interface RequireJitterFactor {
+        ReadyToBuild jitterFactor(double jitterFactor);
+    }
+
+    static class ReadyToBuild {
+        private final int maxRetries;
+        private final Duration firstBackoff;
+        private final double jitterFactor;
+
+        private ReadyToBuild(int maxRetries, Duration firstBackoff, double jitterFactor) {
+            this.maxRetries = maxRetries;
+            this.firstBackoff = firstBackoff;
+            this.jitterFactor = jitterFactor;
+        }
+
+        RetryBackoffConfiguration build() {
+            return new RetryBackoffConfiguration(maxRetries, firstBackoff, jitterFactor);
+        }
+    }
+
+    static RequireMaxRetries builder() {
+        return maxRetries -> firstBackoff -> jitterFactor -> new ReadyToBuild(maxRetries, firstBackoff, jitterFactor);
+    }
+
+    static final double DEFAULT_JITTER_FACTOR = 0.5;
+    static final int DEFAULT_MAX_RETRIES = 3;
+    static final Duration DEFAULT_FIRST_BACKOFF = Duration.ofMillis(100);
+    public static final RetryBackoffConfiguration DEFAULT = new RetryBackoffConfiguration(
+        DEFAULT_MAX_RETRIES,
+        DEFAULT_FIRST_BACKOFF,
+        DEFAULT_JITTER_FACTOR);
+
+    private final int maxRetries;
+    private final Duration firstBackoff;
+    private final double jitterFactor;
+
+    private RetryBackoffConfiguration(int maxRetries, Duration firstBackoff, double jitterFactor) {
+        Preconditions.checkArgument(!firstBackoff.isNegative(), "firstBackoff is not allowed to be negative");
+        Preconditions.checkArgument(maxRetries >= 0, "maxRetries is not allowed to be negative");
+        Preconditions.checkArgument(jitterFactor >= 0 && jitterFactor <= 1.0, "jitterFactor is not " +
+            "allowed to be negative or greater than 1");
+
+        this.maxRetries = maxRetries;
+        this.firstBackoff = firstBackoff;
+        this.jitterFactor = jitterFactor;
+    }
+
+    public int getMaxRetries() {
+        return maxRetries;
+    }
+
+    public Duration getFirstBackoff() {
+        return firstBackoff;
+    }
+
+    public double getJitterFactor() {
+        return jitterFactor;
+    }
+
+    @Override
+    public final boolean equals(Object o) {
+        if (o instanceof RetryBackoffConfiguration) {
+            RetryBackoffConfiguration that = (RetryBackoffConfiguration) o;
+
+            return Objects.equals(this.maxRetries, that.maxRetries)
+                && Objects.equals(this.jitterFactor, that.jitterFactor)
+                && Objects.equals(this.firstBackoff, that.firstBackoff);
+        }
+        return false;
+    }
+
+    @Override
+    public final int hashCode() {
+        return Objects.hash(maxRetries, firstBackoff, jitterFactor);
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(this)
+            .add("maxRetries", maxRetries)
+            .add("firstBackoff", firstBackoff)
+            .add("jitterFactor", jitterFactor)
+            .toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/1f390f4f/mailbox/api/src/test/java/org/apache/james/mailbox/events/ErrorHandlingContract.java
----------------------------------------------------------------------
diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/ErrorHandlingContract.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/ErrorHandlingContract.java
index 4b17a6f..3f0cdf7 100644
--- a/mailbox/api/src/test/java/org/apache/james/mailbox/events/ErrorHandlingContract.java
+++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/ErrorHandlingContract.java
@@ -21,6 +21,7 @@ package org.apache.james.mailbox.events;
 
 import static org.apache.james.mailbox.events.EventBusTestFixture.EVENT;
 import static org.apache.james.mailbox.events.EventBusTestFixture.NO_KEYS;
+import static org.apache.james.mailbox.events.EventBusTestFixture.WAIT_CONDITION;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.spy;
@@ -86,8 +87,8 @@ interface ErrorHandlingContract extends EventBusContract {
         eventBus().register(eventCollector, new EventBusTestFixture.GroupA());
         eventBus().dispatch(EVENT, NO_KEYS).block();
 
-        assertThat(eventCollector.getEvents())
-            .hasSize(1);
+        WAIT_CONDITION
+            .until(() -> assertThat(eventCollector.getEvents()).hasSize(1));
     }
 
     @Test
@@ -103,12 +104,12 @@ interface ErrorHandlingContract extends EventBusContract {
         eventBus().register(eventCollector, new EventBusTestFixture.GroupA());
         eventBus().dispatch(EVENT, NO_KEYS).block();
 
-        assertThat(eventCollector.getEvents())
-            .hasSize(1);
+        WAIT_CONDITION
+            .until(() -> assertThat(eventCollector.getEvents()).hasSize(1));
     }
 
     @Test
-    default void listenerShouldNotReceiveWhenFailsGreaterThanMaxRetries() {
+    default void listenerShouldNotReceiveWhenFailsGreaterThanMaxRetries() throws Exception {
         EventCollector eventCollector = eventCollector();
 
         doThrow(new RuntimeException())
@@ -121,32 +122,50 @@ interface ErrorHandlingContract extends EventBusContract {
         eventBus().register(eventCollector, new EventBusTestFixture.GroupA());
         eventBus().dispatch(EVENT, NO_KEYS).block();
 
+        TimeUnit.SECONDS.sleep(1);
         assertThat(eventCollector.getEvents())
             .isEmpty();
     }
 
     @Test
-    default void retriesBackOffShouldDelayByExponentialGrowth() {
+    default void exceedingMaxRetriesShouldStopConsumingFailedEvent() throws Exception {
+        ThrowingListener throwingListener = throwingListener();
+
+        eventBus().register(throwingListener, new EventBusTestFixture.GroupA());
+        eventBus().dispatch(EVENT, NO_KEYS).block();
+
+        TimeUnit.SECONDS.sleep(5);
+        int numberOfCallsAfterExceedMaxRetries = throwingListener.timeElapsed.size();
+        TimeUnit.SECONDS.sleep(5);
+
+        assertThat(throwingListener.timeElapsed.size())
+            .isEqualTo(numberOfCallsAfterExceedMaxRetries);
+    }
+
+    @Test
+    default void retriesBackOffShouldDelayByExponentialGrowth() throws Exception {
         ThrowingListener throwingListener = throwingListener();
 
         eventBus().register(throwingListener, new EventBusTestFixture.GroupA());
         eventBus().dispatch(EVENT, NO_KEYS).block();
 
+        TimeUnit.SECONDS.sleep(5);
         SoftAssertions.assertSoftly(softly -> {
-            softly.assertThat(throwingListener.timeElapsed).hasSize(4);
+            List<Instant> timeElapsed = throwingListener.timeElapsed;
+            softly.assertThat(timeElapsed).hasSize(4);
 
             long minFirstDelayAfter = 100; // first backOff
             long minSecondDelayAfter = 100; // 200 * jitter factor (200 * 0.5)
             long minThirdDelayAfter = 200; // 400 * jitter factor (400 * 0.5)
 
-            softly.assertThat(throwingListener.timeElapsed.get(1))
-                .isAfterOrEqualTo(throwingListener.timeElapsed.get(0).plusMillis(minFirstDelayAfter));
+            softly.assertThat(timeElapsed.get(1))
+                .isAfterOrEqualTo(timeElapsed.get(0).plusMillis(minFirstDelayAfter));
 
-            softly.assertThat(throwingListener.timeElapsed.get(2))
-                .isAfterOrEqualTo(throwingListener.timeElapsed.get(1).plusMillis(minSecondDelayAfter));
+            softly.assertThat(timeElapsed.get(2))
+                .isAfterOrEqualTo(timeElapsed.get(1).plusMillis(minSecondDelayAfter));
 
-            softly.assertThat(throwingListener.timeElapsed.get(3))
-                .isAfterOrEqualTo(throwingListener.timeElapsed.get(2).plusMillis(minThirdDelayAfter));
+            softly.assertThat(timeElapsed.get(3))
+                .isAfterOrEqualTo(timeElapsed.get(2).plusMillis(minThirdDelayAfter));
         });
     }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/1f390f4f/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusTestFixture.java
----------------------------------------------------------------------
diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusTestFixture.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusTestFixture.java
index e6920c0..85c798b 100644
--- a/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusTestFixture.java
+++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusTestFixture.java
@@ -37,6 +37,7 @@ import org.apache.james.mailbox.model.TestId;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
+import com.jayway.awaitility.Duration;
 import com.jayway.awaitility.core.ConditionFactory;
 
 public interface EventBusTestFixture {
@@ -100,8 +101,7 @@ public interface EventBusTestFixture {
     List<Class<? extends Group>> ALL_GROUPS = ImmutableList.of(GroupA.class, GroupB.class, GroupC.class);
 
     GroupA GROUP_A = new GroupA();
-
-    ConditionFactory WAIT_CONDITION = await().timeout(com.jayway.awaitility.Duration.ONE_SECOND);
+    ConditionFactory WAIT_CONDITION = await().timeout(Duration.FIVE_SECONDS);
 
     static MailboxListener newListener() {
         MailboxListener listener = mock(MailboxListener.class);

http://git-wip-us.apache.org/repos/asf/james-project/blob/1f390f4f/mailbox/api/src/test/java/org/apache/james/mailbox/events/RetryBackoffConfigurationTest.java
----------------------------------------------------------------------
diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/RetryBackoffConfigurationTest.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/RetryBackoffConfigurationTest.java
new file mode 100644
index 0000000..2253cd7
--- /dev/null
+++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/RetryBackoffConfigurationTest.java
@@ -0,0 +1,140 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.events;
+
+import static org.apache.james.mailbox.events.RetryBackoffConfiguration.DEFAULT_FIRST_BACKOFF;
+import static org.apache.james.mailbox.events.RetryBackoffConfiguration.DEFAULT_JITTER_FACTOR;
+import static org.apache.james.mailbox.events.RetryBackoffConfiguration.DEFAULT_MAX_RETRIES;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.time.Duration;
+
+import org.assertj.core.api.SoftAssertions;
+import org.junit.jupiter.api.Test;
+
+import nl.jqno.equalsverifier.EqualsVerifier;
+
+class RetryBackoffConfigurationTest {
+
+    @Test
+    void shouldMatchBeanContract() {
+        EqualsVerifier.forClass(RetryBackoffConfiguration.class)
+            .verify();
+    }
+
+    @Test
+    void buildShouldThrowWhenNegativeFirstBackoff() {
+        assertThatThrownBy(() -> RetryBackoffConfiguration.builder()
+            .maxRetries(DEFAULT_MAX_RETRIES)
+            .firstBackoff(Duration.ofMillis(-1000L))
+            .jitterFactor(DEFAULT_JITTER_FACTOR)
+            .build())
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("firstBackoff is not allowed to be negative");
+    }
+
+    @Test
+    void buildShouldThrowWhenNegativeMaxRetries() {
+        assertThatThrownBy(() -> RetryBackoffConfiguration.builder()
+            .maxRetries(-6)
+            .firstBackoff(DEFAULT_FIRST_BACKOFF)
+            .jitterFactor(DEFAULT_JITTER_FACTOR)
+            .build())
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("maxRetries is not allowed to be negative");
+    }
+
+    @Test
+    void buildShouldThrowWhenNegativeJitterFactor() {
+        assertThatThrownBy(() -> RetryBackoffConfiguration.builder()
+            .maxRetries(DEFAULT_MAX_RETRIES)
+            .firstBackoff(DEFAULT_FIRST_BACKOFF)
+            .jitterFactor(-2.5)
+            .build())
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("jitterFactor is not allowed to be negative or greater than 1");
+    }
+
+    @Test
+    void buildShouldThrowWhenGreaterThanOneJitterFactor() {
+        assertThatThrownBy(() -> RetryBackoffConfiguration.builder()
+            .maxRetries(DEFAULT_MAX_RETRIES)
+            .firstBackoff(DEFAULT_FIRST_BACKOFF)
+            .jitterFactor(1.000001)
+            .build())
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("jitterFactor is not allowed to be negative or greater than 1");
+    }
+
+    @Test
+    void buildShouldSuccessWhenZeroFirstBackoff() {
+        RetryBackoffConfiguration retryBackoff = RetryBackoffConfiguration.builder()
+            .maxRetries(DEFAULT_MAX_RETRIES)
+            .firstBackoff(Duration.ZERO)
+            .jitterFactor(DEFAULT_JITTER_FACTOR)
+            .build();
+
+        assertThat(retryBackoff.getFirstBackoff().toMillis())
+            .isEqualTo(0L);
+    }
+
+    @Test
+    void buildShouldSuccessWhenZeroMaxRetries() {
+        RetryBackoffConfiguration retryBackoff = RetryBackoffConfiguration.builder()
+            .maxRetries(0)
+            .firstBackoff(DEFAULT_FIRST_BACKOFF)
+            .jitterFactor(DEFAULT_JITTER_FACTOR)
+            .build();
+
+        assertThat(retryBackoff.getMaxRetries())
+            .isEqualTo(0L);
+    }
+
+    @Test
+    void buildShouldSuccessWhenZeroJitterFactor() {
+        RetryBackoffConfiguration retryBackoff = RetryBackoffConfiguration.builder()
+            .maxRetries(DEFAULT_MAX_RETRIES)
+            .firstBackoff(DEFAULT_FIRST_BACKOFF)
+            .jitterFactor(0)
+            .build();
+
+        assertThat(retryBackoff.getJitterFactor())
+            .isEqualTo(0);
+    }
+    @Test
+    void buildShouldReturnCorrespondingValues() {
+        RetryBackoffConfiguration retryBackoff = RetryBackoffConfiguration.builder()
+            .maxRetries(5)
+            .firstBackoff(Duration.ofMillis(200))
+            .jitterFactor(0.6)
+            .build();
+
+        SoftAssertions.assertSoftly(softly -> {
+            softly.assertThat(retryBackoff.getJitterFactor())
+                .isEqualTo(0.6);
+            softly.assertThat(retryBackoff.getMaxRetries())
+                .isEqualTo(5);
+            softly.assertThat(retryBackoff.getFirstBackoff())
+                .isEqualTo(Duration.ofMillis(200));
+        });
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/james-project/blob/1f390f4f/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupConsumerRetry.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupConsumerRetry.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupConsumerRetry.java
new file mode 100644
index 0000000..04a513a
--- /dev/null
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupConsumerRetry.java
@@ -0,0 +1,133 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.events;
+
+import static org.apache.james.backend.rabbitmq.Constants.DIRECT_EXCHANGE;
+import static org.apache.james.backend.rabbitmq.Constants.DURABLE;
+import static org.apache.james.backend.rabbitmq.Constants.EMPTY_ROUTING_KEY;
+import static org.apache.james.mailbox.events.GroupRegistration.RETRY_COUNT;
+import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT;
+
+import org.apache.james.mailbox.Event;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableMap;
+import com.rabbitmq.client.AMQP;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.rabbitmq.BindingSpecification;
+import reactor.rabbitmq.ExchangeSpecification;
+import reactor.rabbitmq.OutboundMessage;
+import reactor.rabbitmq.Sender;
+
+class GroupConsumerRetry {
+
+    static class RetryExchangeName {
+
+        static RetryExchangeName of(Group group) {
+            return new RetryExchangeName(GroupRegistration.groupName(group.getClass()));
+        }
+
+        static final String MAILBOX_EVENT_RETRY_EXCHANGE_PREFIX = MAILBOX_EVENT + "-retryExchange-";
+
+        private final String name;
+
+        private RetryExchangeName(String name) {
+            Preconditions.checkArgument(!Strings.isNullOrEmpty(name), "Exchange name must be specified");
+            this.name = name;
+        }
+
+        String asString() {
+            return MAILBOX_EVENT_RETRY_EXCHANGE_PREFIX + name;
+        }
+    }
+
+    static class RetryPublisher {
+
+        private final Sender sender;
+        private final RetryExchangeName retryExchangeName;
+        private final RetryBackoffConfiguration retryBackoff;
+
+        RetryPublisher(Sender sender, RetryExchangeName retryExchangeName, RetryBackoffConfiguration retryBackoff) {
+            this.sender = sender;
+            this.retryExchangeName = retryExchangeName;
+            this.retryBackoff = retryBackoff;
+        }
+
+        Mono<Void> publish(Event event, byte[] eventAsByte, int currentRetryCount) {
+            return sender.send(createRetryMessage(eventAsByte, currentRetryCount))
+                .doOnError(throwable -> LOGGER.error("Exception happens when publishing event of user {} to retry exchange," +
+                        "this event will be lost forever",
+                    event.getUser().asString(), throwable));
+        }
+
+        private Mono<OutboundMessage> createRetryMessage(byte[] eventAsByte, int currentRetryCount) {
+            if (currentRetryCount >= retryBackoff.getMaxRetries()) {
+                return Mono.empty(); // will store event to deadletter latter
+            }
+
+            return Mono.just(new OutboundMessage(
+                retryExchangeName.asString(),
+                EMPTY_ROUTING_KEY,
+                new AMQP.BasicProperties.Builder()
+                    .headers(ImmutableMap.of(RETRY_COUNT, currentRetryCount + 1))
+                    .build(),
+                eventAsByte));
+        }
+    }
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(GroupConsumerRetry.class);
+
+    private final Sender sender;
+    private final GroupRegistration.WorkQueueName queueName;
+    private final RetryExchangeName retryExchangeName;
+    private final RetryPublisher retryPublisher;
+
+    GroupConsumerRetry(Sender sender, GroupRegistration.WorkQueueName queueName, Group group,
+                       RetryBackoffConfiguration retryBackoff) {
+        this.sender = sender;
+        this.queueName = queueName;
+        this.retryExchangeName = RetryExchangeName.of(group);
+        this.retryPublisher = new RetryPublisher(sender, retryExchangeName, retryBackoff);
+    }
+
+    Mono<Void> createRetryExchange() {
+        return Flux.concat(
+            sender.declareExchange(ExchangeSpecification.exchange(retryExchangeName.asString())
+                .durable(DURABLE)
+                .type(DIRECT_EXCHANGE)),
+            sender.bind(BindingSpecification.binding()
+                .exchange(retryExchangeName.asString())
+                .queue(queueName.asString())
+                .routingKey(EMPTY_ROUTING_KEY)))
+            .then();
+    }
+
+    Mono<Void> handleRetry(byte[] eventAsBytes, Event event, int currentRetryCount, Throwable throwable) {
+        LOGGER.error("Exception happens when handling event {} of user {}",
+            event.getEventId().getId().toString(), event.getUser().asString(), throwable);
+
+        return retryPublisher.publish(event, eventAsBytes, currentRetryCount);
+    }
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/1f390f4f/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
index 36db6ae..b382dee 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
@@ -34,20 +34,18 @@ import java.util.Optional;
 import org.apache.james.event.json.EventSerializer;
 import org.apache.james.mailbox.Event;
 import org.apache.james.mailbox.MailboxListener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import com.rabbitmq.client.Connection;
-import com.rabbitmq.client.Delivery;
 
-import play.api.libs.json.JsResult;
 import reactor.core.Disposable;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
+import reactor.core.publisher.MonoProcessor;
 import reactor.core.scheduler.Schedulers;
+import reactor.rabbitmq.AcknowledgableDelivery;
 import reactor.rabbitmq.BindingSpecification;
 import reactor.rabbitmq.QueueSpecification;
 import reactor.rabbitmq.RabbitFlux;
@@ -56,10 +54,11 @@ import reactor.rabbitmq.ReceiverOptions;
 import reactor.rabbitmq.Sender;
 
 class GroupRegistration implements Registration {
+
     static class WorkQueueName {
         @VisibleForTesting
         static WorkQueueName of(Class<? extends Group> clazz) {
-            return new WorkQueueName(clazz.getName());
+            return new WorkQueueName(groupName(clazz));
         }
 
         static WorkQueueName of(Group group) {
@@ -80,7 +79,12 @@ class GroupRegistration implements Registration {
         }
     }
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(GroupRegistration.class);
+    static String groupName(Class<? extends Group> clazz) {
+        return clazz.getName();
+    }
+
+    static final String RETRY_COUNT = "retry-count";
+    static final int DEFAULT_RETRY_COUNT = 0;
 
     private final MailboxListener mailboxListener;
     private final WorkQueueName queueName;
@@ -88,10 +92,14 @@ class GroupRegistration implements Registration {
     private final Runnable unregisterGroup;
     private final Sender sender;
     private final EventSerializer eventSerializer;
+    private final GroupConsumerRetry retryHandler;
+    private final WaitDelayGenerator delayGenerator;
+    private final RetryBackoffConfiguration retryBackoff;
     private Optional<Disposable> receiverSubscriber;
 
     GroupRegistration(Mono<Connection> connectionSupplier, Sender sender, EventSerializer eventSerializer,
-                              MailboxListener mailboxListener, Group group, Runnable unregisterGroup) {
+                      MailboxListener mailboxListener, Group group, RetryBackoffConfiguration retryBackoff,
+                      Runnable unregisterGroup) {
         this.eventSerializer = eventSerializer;
         this.mailboxListener = mailboxListener;
         this.queueName = WorkQueueName.of(group);
@@ -99,10 +107,14 @@ class GroupRegistration implements Registration {
         this.receiver = RabbitFlux.createReceiver(new ReceiverOptions().connectionMono(connectionSupplier));
         this.receiverSubscriber = Optional.empty();
         this.unregisterGroup = unregisterGroup;
+        this.retryHandler = new GroupConsumerRetry(sender, queueName, group, retryBackoff);
+        this.retryBackoff = retryBackoff;
+        this.delayGenerator = WaitDelayGenerator.of(retryBackoff);
     }
 
     GroupRegistration start() {
         createGroupWorkQueue()
+            .then(retryHandler.createRetryExchange())
             .doOnSuccess(any -> this.subscribeWorkQueue())
             .block();
         return this;
@@ -123,23 +135,33 @@ class GroupRegistration implements Registration {
     }
 
     private void subscribeWorkQueue() {
-        receiverSubscriber = Optional.of(receiver.consumeAutoAck(queueName.asString())
+        receiverSubscriber = Optional.of(receiver.consumeManualAck(queueName.asString())
             .subscribeOn(Schedulers.parallel())
-            .map(Delivery::getBody)
-            .filter(Objects::nonNull)
-            .map(eventInBytes -> new String(eventInBytes, StandardCharsets.UTF_8))
-            .map(eventSerializer::fromJson)
-            .map(JsResult::get)
+            .filter(delivery -> Objects.nonNull(delivery.getBody()))
+            .flatMap(this::deliver)
             .subscribeOn(Schedulers.elastic())
-            .subscribe(event -> deliverEvent(mailboxListener, event)));
+            .subscribe());
     }
 
-    private void deliverEvent(MailboxListener mailboxListener, Event event) {
-        try {
-            mailboxListener.event(event);
-        } catch (Exception e) {
-            LOGGER.error("Exception happens when handling event of user {}", event.getUser().asString(), e);
-        }
+    private Mono<Void> deliver(AcknowledgableDelivery acknowledgableDelivery) {
+        byte[] eventAsBytes = acknowledgableDelivery.getBody();
+        Event event = eventSerializer.fromJson(new String(eventAsBytes, StandardCharsets.UTF_8)).get();
+        int currentRetryCount = getRetryCount(acknowledgableDelivery);
+
+        return delayGenerator.delayIfHaveTo(currentRetryCount)
+            .flatMap(any -> Mono.fromRunnable(() -> mailboxListener.event(event)))
+            .onErrorResume(throwable -> retryHandler.handleRetry(eventAsBytes, event, currentRetryCount, throwable))
+            .then(Mono.fromRunnable(acknowledgableDelivery::ack))
+            .subscribeWith(MonoProcessor.create())
+            .then();
+    }
+
+    static int getRetryCount(AcknowledgableDelivery acknowledgableDelivery) {
+        return Optional.ofNullable(acknowledgableDelivery.getProperties().getHeaders())
+            .flatMap(headers -> Optional.ofNullable(headers.get(RETRY_COUNT)))
+            .filter(object -> object instanceof Integer)
+            .map(object -> (Integer) object)
+            .orElse(DEFAULT_RETRY_COUNT);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/james-project/blob/1f390f4f/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistrationHandler.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistrationHandler.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistrationHandler.java
index c0f4339..33ba13b 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistrationHandler.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistrationHandler.java
@@ -35,11 +35,13 @@ class GroupRegistrationHandler {
     private final EventSerializer eventSerializer;
     private final Sender sender;
     private final Mono<Connection> connectionMono;
+    private final RetryBackoffConfiguration retryBackoff;
 
-    GroupRegistrationHandler(EventSerializer eventSerializer, Sender sender, Mono<Connection> connectionMono) {
+    GroupRegistrationHandler(EventSerializer eventSerializer, Sender sender, Mono<Connection> connectionMono, RetryBackoffConfiguration retryBackoff) {
         this.eventSerializer = eventSerializer;
         this.sender = sender;
         this.connectionMono = connectionMono;
+        this.retryBackoff = retryBackoff;
         this.groupRegistrations = new ConcurrentHashMap<>();
     }
 
@@ -65,6 +67,7 @@ class GroupRegistrationHandler {
             eventSerializer,
             listener,
             group,
+            retryBackoff,
             () -> groupRegistrations.remove(group));
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/james-project/blob/1f390f4f/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
index 447b345..e9e594e 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
@@ -44,24 +44,28 @@ class RabbitMQEventBus implements EventBus {
     private final EventSerializer eventSerializer;
     private final AtomicBoolean isRunning;
     private final RoutingKeyConverter routingKeyConverter;
+    private final RetryBackoffConfiguration retryBackoff;
 
     private GroupRegistrationHandler groupRegistrationHandler;
     private KeyRegistrationHandler keyRegistrationHandler;
     private EventDispatcher eventDispatcher;
     private Sender sender;
 
-    RabbitMQEventBus(RabbitMQConnectionFactory rabbitMQConnectionFactory, EventSerializer eventSerializer, RoutingKeyConverter routingKeyConverter) {
+    RabbitMQEventBus(RabbitMQConnectionFactory rabbitMQConnectionFactory, EventSerializer eventSerializer,
+                     RetryBackoffConfiguration retryBackoff,
+                     RoutingKeyConverter routingKeyConverter) {
         this.connectionMono = Mono.fromSupplier(rabbitMQConnectionFactory::create).cache();
         this.eventSerializer = eventSerializer;
         this.routingKeyConverter = routingKeyConverter;
+        this.retryBackoff = retryBackoff;
         this.isRunning = new AtomicBoolean(false);
     }
 
     public void start() {
         if (!isRunning.get()) {
             sender = RabbitFlux.createSender(new SenderOptions().connectionMono(connectionMono));
-            groupRegistrationHandler = new GroupRegistrationHandler(eventSerializer, sender, connectionMono);
             keyRegistrationHandler = new KeyRegistrationHandler(eventSerializer, sender, connectionMono, routingKeyConverter);
+            groupRegistrationHandler = new GroupRegistrationHandler(eventSerializer, sender, connectionMono, retryBackoff);
             eventDispatcher = new EventDispatcher(eventSerializer, sender);
 
             eventDispatcher.start();

http://git-wip-us.apache.org/repos/asf/james-project/blob/1f390f4f/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/WaitDelayGenerator.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/WaitDelayGenerator.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/WaitDelayGenerator.java
index fdcf9cc..93a20df 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/WaitDelayGenerator.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/WaitDelayGenerator.java
@@ -29,37 +29,39 @@ import reactor.core.publisher.Mono;
 
 class WaitDelayGenerator {
 
-    static WaitDelayGenerator of(RetryBackoff retryBackoff) {
+    static WaitDelayGenerator of(RetryBackoffConfiguration retryBackoff) {
         return new WaitDelayGenerator(retryBackoff);
     }
 
     private static int randomBetween(int lowest, int highest) {
         Preconditions.checkArgument(lowest <= highest, "lowest always has to be less than or equals highest");
+        if (lowest == highest) {
+            return lowest;
+        }
         return SECURE_RANDOM.nextInt(highest - lowest) + lowest;
     }
 
     private static final SecureRandom SECURE_RANDOM = new SecureRandom();
 
-    private final RetryBackoff retryBackoff;
+    private final RetryBackoffConfiguration retryBackoff;
 
-    private WaitDelayGenerator(RetryBackoff retryBackoff) {
+    private WaitDelayGenerator(RetryBackoffConfiguration retryBackoff) {
         this.retryBackoff = retryBackoff;
     }
 
     Mono<Integer> delayIfHaveTo(int retryCount) {
         Mono<Integer> countRetryMono = Mono.just(retryCount);
-        if (retryCount < 1) {
+        if (!shouldDelay(retryCount)) {
             return countRetryMono;
         }
 
         return countRetryMono
-            .filter(count -> count <= retryBackoff.getMaxRetries())
             .delayElement(generateDelay(retryCount));
     }
 
     @VisibleForTesting
     Duration generateDelay(int retryCount) {
-        if (retryCount < 1) {
+        if (!shouldDelay(retryCount)) {
             return Duration.ZERO;
         }
         int exponentialFactor = Double.valueOf(Math.pow(2, retryCount - 1)).intValue();
@@ -68,4 +70,8 @@ class WaitDelayGenerator {
 
         return Duration.ofMillis(randomBetween(minDelay, maxDelay));
     }
+
+    private boolean shouldDelay(int retryCount) {
+        return retryCount >= 1 && retryCount <= retryBackoff.getMaxRetries();
+    }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/1f390f4f/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
index dbb125a..ffcdac8 100644
--- a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
+++ b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
@@ -32,6 +32,7 @@ import static org.apache.james.mailbox.events.EventBusTestFixture.GroupA;
 import static org.apache.james.mailbox.events.EventBusTestFixture.KEY_1;
 import static org.apache.james.mailbox.events.EventBusTestFixture.MailboxListenerCountingSuccessfulExecution;
 import static org.apache.james.mailbox.events.EventBusTestFixture.NO_KEYS;
+import static org.apache.james.mailbox.events.EventBusTestFixture.newListener;
 import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT;
 import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT_EXCHANGE_NAME;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -77,7 +78,8 @@ import reactor.rabbitmq.SenderOptions;
 
 class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract, GroupContract.MultipleEventBusGroupContract,
     EventBusConcurrentTestContract.SingleEventBusConcurrentContract, EventBusConcurrentTestContract.MultiEventBusConcurrentContract,
-    KeyContract.SingleEventBusKeyContract, KeyContract.MultipleEventBusKeyContract {
+    KeyContract.SingleEventBusKeyContract, KeyContract.MultipleEventBusKeyContract,
+    ErrorHandlingContract {
 
     static class RabbitMQEventExtension implements BeforeEachCallback, AfterEachCallback {
         static final RabbitMQExtension rabbitMQExtension = new RabbitMQExtension();
@@ -131,9 +133,10 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
         eventSerializer = new EventSerializer(mailboxIdFactory, new TestMessageId.Factory());
         routingKeyConverter = RoutingKeyConverter.forFactories(new MailboxIdRegistrationKey.Factory(mailboxIdFactory));
 
-        eventBus = new RabbitMQEventBus(connectionFactory, eventSerializer, routingKeyConverter);
-        eventBus2 = new RabbitMQEventBus(connectionFactory, eventSerializer, routingKeyConverter);
-        eventBus3 = new RabbitMQEventBus(connectionFactory, eventSerializer, routingKeyConverter);
+        eventBus = newEventBus();
+        eventBus2 = newEventBus();
+        eventBus3 = newEventBus();
+
         eventBus.start();
         eventBus2.start();
         eventBus3.start();
@@ -152,6 +155,10 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
         sender.close();
     }
 
+    private RabbitMQEventBus newEventBus() {
+        return new RabbitMQEventBus(connectionFactory, eventSerializer, RetryBackoffConfiguration.DEFAULT, routingKeyConverter);
+    }
+
     @Override
     public EventBus eventBus() {
         return eventBus;
@@ -181,6 +188,17 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
 
     }
 
+    @Test
+    void registerGroupShouldCreateRetryExchange() throws Exception {
+        MailboxListener listener = newListener();
+        EventBusTestFixture.GroupA registeredGroup = new EventBusTestFixture.GroupA();
+        eventBus.register(listener, registeredGroup);
+
+        GroupConsumerRetry.RetryExchangeName retryExchangeName = GroupConsumerRetry.RetryExchangeName.of(registeredGroup);
+        assertThat(testExtension.rabbitMQExtension.managementAPI().listExchanges())
+            .anyMatch(exchange -> exchange.getName().equals(retryExchangeName.asString()));
+    }
+
     @Nested
     class PublishingTest {
         private static final String MAILBOX_WORK_QUEUE_NAME = MAILBOX_EVENT + "-workQueue";

http://git-wip-us.apache.org/repos/asf/james-project/blob/1f390f4f/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/WaitDelayGeneratorTest.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/WaitDelayGeneratorTest.java b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/WaitDelayGeneratorTest.java
index acc3a9f..9cb2d62 100644
--- a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/WaitDelayGeneratorTest.java
+++ b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/WaitDelayGeneratorTest.java
@@ -30,7 +30,7 @@ class WaitDelayGeneratorTest {
 
     @Test
     void generateDelayShouldReturnZeroWhenZeroRetryCount() {
-        WaitDelayGenerator generator = WaitDelayGenerator.of(RetryBackoff.defaultRetryBackoff());
+        WaitDelayGenerator generator = WaitDelayGenerator.of(RetryBackoffConfiguration.DEFAULT);
 
         assertThat(generator.generateDelay(0))
             .isEqualTo(Duration.ofMillis(0));
@@ -38,7 +38,7 @@ class WaitDelayGeneratorTest {
 
     @Test
     void generateDelayShouldReturnByRandomInRangeOfExponentialGrowthOfRetryCount() {
-        WaitDelayGenerator generator = WaitDelayGenerator.of(RetryBackoff.builder()
+        WaitDelayGenerator generator = WaitDelayGenerator.of(RetryBackoffConfiguration.builder()
             .maxRetries(4)
             .firstBackoff(Duration.ofMillis(100))
             .jitterFactor(0.5)
@@ -55,4 +55,49 @@ class WaitDelayGeneratorTest {
                 .isBetween(800L, 1200L);
         });
     }
+
+    @Test
+    void generateDelayShouldReturnZeroWhenZeroMaxRetries() {
+        WaitDelayGenerator generator = WaitDelayGenerator.of(RetryBackoffConfiguration.builder()
+            .maxRetries(0)
+            .firstBackoff(Duration.ofMillis(1000))
+            .jitterFactor(0.5)
+            .build());
+
+        SoftAssertions.assertSoftly(softly -> {
+            softly.assertThat(generator.generateDelay(1)).isEqualTo(Duration.ZERO);
+            softly.assertThat(generator.generateDelay(2)).isEqualTo(Duration.ZERO);
+            softly.assertThat(generator.generateDelay(3)).isEqualTo(Duration.ZERO);
+        });
+    }
+
+    @Test
+    void generateDelayShouldReturnZeroWhenZeroFirstBackOff() {
+        WaitDelayGenerator generator = WaitDelayGenerator.of(RetryBackoffConfiguration.builder()
+            .maxRetries(3)
+            .firstBackoff(Duration.ZERO)
+            .jitterFactor(0.5)
+            .build());
+
+        SoftAssertions.assertSoftly(softly -> {
+            softly.assertThat(generator.generateDelay(1)).isEqualTo(Duration.ZERO);
+            softly.assertThat(generator.generateDelay(2)).isEqualTo(Duration.ZERO);
+            softly.assertThat(generator.generateDelay(3)).isEqualTo(Duration.ZERO);
+        });
+    }
+
+    @Test
+    void generateDelayShouldReturnFloorOfExponentialGrowthStepsWhenZeroJitterFactor() {
+        WaitDelayGenerator generator = WaitDelayGenerator.of(RetryBackoffConfiguration.builder()
+            .maxRetries(3)
+            .firstBackoff(Duration.ofMillis(100))
+            .jitterFactor(0.0)
+            .build());
+
+        SoftAssertions.assertSoftly(softly -> {
+            softly.assertThat(generator.generateDelay(1)).isEqualTo(Duration.ofMillis(100));
+            softly.assertThat(generator.generateDelay(2)).isEqualTo(Duration.ofMillis(200));
+            softly.assertThat(generator.generateDelay(3)).isEqualTo(Duration.ofMillis(400));
+        });
+    }
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org