You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2019/01/16 11:11:51 UTC
[3/3] james-project git commit: MAILBOX-372 handle retrying for event
group consumers
MAILBOX-372 handle retrying for event group consumers
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/1f390f4f
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/1f390f4f
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/1f390f4f
Branch: refs/heads/master
Commit: 1f390f4f3f69db1c073b5e018b25cdf52acb3647
Parents: bd656cf
Author: tran tien duc <dt...@linagora.com>
Authored: Tue Jan 15 14:58:45 2019 +0700
Committer: tran tien duc <dt...@linagora.com>
Committed: Wed Jan 16 14:21:28 2019 +0700
----------------------------------------------------------------------
.../james/mailbox/events/RetryBackoff.java | 110 ---------------
.../events/RetryBackoffConfiguration.java | 125 +++++++++++++++++
.../mailbox/events/ErrorHandlingContract.java | 45 ++++--
.../mailbox/events/EventBusTestFixture.java | 4 +-
.../events/RetryBackoffConfigurationTest.java | 140 +++++++++++++++++++
.../mailbox/events/GroupConsumerRetry.java | 133 ++++++++++++++++++
.../james/mailbox/events/GroupRegistration.java | 62 +++++---
.../events/GroupRegistrationHandler.java | 5 +-
.../james/mailbox/events/RabbitMQEventBus.java | 8 +-
.../mailbox/events/WaitDelayGenerator.java | 18 ++-
.../mailbox/events/RabbitMQEventBusTest.java | 26 +++-
.../mailbox/events/WaitDelayGeneratorTest.java | 49 ++++++-
12 files changed, 565 insertions(+), 160 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/james-project/blob/1f390f4f/mailbox/api/src/main/java/org/apache/james/mailbox/events/RetryBackoff.java
----------------------------------------------------------------------
diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/events/RetryBackoff.java b/mailbox/api/src/main/java/org/apache/james/mailbox/events/RetryBackoff.java
deleted file mode 100644
index 2fa15ad..0000000
--- a/mailbox/api/src/main/java/org/apache/james/mailbox/events/RetryBackoff.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one *
- * or more contributor license agreements. See the NOTICE file *
- * distributed with this work for additional information *
- * regarding copyright ownership. The ASF licenses this file *
- * to you under the Apache License, Version 2.0 (the *
- * "License"); you may not use this file except in compliance *
- * with the License. You may obtain a copy of the License at *
- * *
- * http://www.apache.org/licenses/LICENSE-2.0 *
- * *
- * Unless required by applicable law or agreed to in writing, *
- * software distributed under the License is distributed on an *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
- * KIND, either express or implied. See the License for the *
- * specific language governing permissions and limitations *
- * under the License. *
- ****************************************************************/
-
-package org.apache.james.mailbox.events;
-
-import java.time.Duration;
-
-import com.google.common.base.MoreObjects;
-import com.google.common.base.Preconditions;
-
-class RetryBackoff {
-
- @FunctionalInterface
- interface RequireMaxRetries {
- RequireFirstBackoff maxRetries(int maxRetries);
- }
-
- @FunctionalInterface
- interface RequireFirstBackoff {
- RequireJitterFactor firstBackoff(Duration firstBackoff);
- }
-
- @FunctionalInterface
- interface RequireJitterFactor {
- ReadyToBuild jitterFactor(double jitterFactor);
- }
-
- static class ReadyToBuild {
- private final int maxRetries;
- private final Duration firstBackoff;
- private final double jitterFactor;
-
- private ReadyToBuild(int maxRetries, Duration firstBackoff, double jitterFactor) {
- this.maxRetries = maxRetries;
- this.firstBackoff = firstBackoff;
- this.jitterFactor = jitterFactor;
- }
-
- RetryBackoff build() {
- return new RetryBackoff(maxRetries, firstBackoff, jitterFactor);
- }
- }
-
- static RequireMaxRetries builder() {
- return maxRetries -> firstBackoff -> jitterFactor -> new ReadyToBuild(maxRetries, firstBackoff, jitterFactor);
- }
-
- static RetryBackoff defaultRetryBackoff() {
- return builder()
- .maxRetries(DEFAULT_MAX_RETRIES)
- .firstBackoff(DEFAULT_FIRST_BACKOFF)
- .jitterFactor(DEFAULT_JITTER_FACTOR)
- .build();
- }
-
- private static final double DEFAULT_JITTER_FACTOR = 0.5;
- private static final int DEFAULT_MAX_RETRIES = 3;
- private static final Duration DEFAULT_FIRST_BACKOFF = Duration.ofMillis(100);
-
- private final int maxRetries;
- private final Duration firstBackoff;
- private final double jitterFactor;
-
- RetryBackoff(int maxRetries, Duration firstBackoff, double jitterFactor) {
- Preconditions.checkArgument(!firstBackoff.isNegative() && !firstBackoff.isZero(), "firstBackoff has to be strictly positive");
- Preconditions.checkArgument(maxRetries > 0, "maxRetries has to be strictly positive");
- Preconditions.checkArgument(jitterFactor > 0, "jitterFactor has to be strictly positive");
-
- this.maxRetries = maxRetries;
- this.firstBackoff = firstBackoff;
- this.jitterFactor = jitterFactor;
- }
-
- public int getMaxRetries() {
- return maxRetries;
- }
-
- public Duration getFirstBackoff() {
- return firstBackoff;
- }
-
- public double getJitterFactor() {
- return jitterFactor;
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(this)
- .add("maxRetries", maxRetries)
- .add("firstBackoff", firstBackoff)
- .add("jitterFactor", jitterFactor)
- .toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/james-project/blob/1f390f4f/mailbox/api/src/main/java/org/apache/james/mailbox/events/RetryBackoffConfiguration.java
----------------------------------------------------------------------
diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/events/RetryBackoffConfiguration.java b/mailbox/api/src/main/java/org/apache/james/mailbox/events/RetryBackoffConfiguration.java
new file mode 100644
index 0000000..ef16efb
--- /dev/null
+++ b/mailbox/api/src/main/java/org/apache/james/mailbox/events/RetryBackoffConfiguration.java
@@ -0,0 +1,125 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.mailbox.events;
+
+import java.time.Duration;
+import java.util.Objects;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Preconditions;
+
+class RetryBackoffConfiguration {
+
+ @FunctionalInterface
+ interface RequireMaxRetries {
+ RequireFirstBackoff maxRetries(int maxRetries);
+ }
+
+ @FunctionalInterface
+ interface RequireFirstBackoff {
+ RequireJitterFactor firstBackoff(Duration firstBackoff);
+ }
+
+ @FunctionalInterface
+ interface RequireJitterFactor {
+ ReadyToBuild jitterFactor(double jitterFactor);
+ }
+
+ static class ReadyToBuild {
+ private final int maxRetries;
+ private final Duration firstBackoff;
+ private final double jitterFactor;
+
+ private ReadyToBuild(int maxRetries, Duration firstBackoff, double jitterFactor) {
+ this.maxRetries = maxRetries;
+ this.firstBackoff = firstBackoff;
+ this.jitterFactor = jitterFactor;
+ }
+
+ RetryBackoffConfiguration build() {
+ return new RetryBackoffConfiguration(maxRetries, firstBackoff, jitterFactor);
+ }
+ }
+
+ static RequireMaxRetries builder() {
+ return maxRetries -> firstBackoff -> jitterFactor -> new ReadyToBuild(maxRetries, firstBackoff, jitterFactor);
+ }
+
+ static final double DEFAULT_JITTER_FACTOR = 0.5;
+ static final int DEFAULT_MAX_RETRIES = 3;
+ static final Duration DEFAULT_FIRST_BACKOFF = Duration.ofMillis(100);
+ public static final RetryBackoffConfiguration DEFAULT = new RetryBackoffConfiguration(
+ DEFAULT_MAX_RETRIES,
+ DEFAULT_FIRST_BACKOFF,
+ DEFAULT_JITTER_FACTOR);
+
+ private final int maxRetries;
+ private final Duration firstBackoff;
+ private final double jitterFactor;
+
+ private RetryBackoffConfiguration(int maxRetries, Duration firstBackoff, double jitterFactor) {
+ Preconditions.checkArgument(!firstBackoff.isNegative(), "firstBackoff is not allowed to be negative");
+ Preconditions.checkArgument(maxRetries >= 0, "maxRetries is not allowed to be negative");
+ Preconditions.checkArgument(jitterFactor >= 0 && jitterFactor <= 1.0, "jitterFactor is not " +
+ "allowed to be negative or greater than 1");
+
+ this.maxRetries = maxRetries;
+ this.firstBackoff = firstBackoff;
+ this.jitterFactor = jitterFactor;
+ }
+
+ public int getMaxRetries() {
+ return maxRetries;
+ }
+
+ public Duration getFirstBackoff() {
+ return firstBackoff;
+ }
+
+ public double getJitterFactor() {
+ return jitterFactor;
+ }
+
+ @Override
+ public final boolean equals(Object o) {
+ if (o instanceof RetryBackoffConfiguration) {
+ RetryBackoffConfiguration that = (RetryBackoffConfiguration) o;
+
+ return Objects.equals(this.maxRetries, that.maxRetries)
+ && Objects.equals(this.jitterFactor, that.jitterFactor)
+ && Objects.equals(this.firstBackoff, that.firstBackoff);
+ }
+ return false;
+ }
+
+ @Override
+ public final int hashCode() {
+ return Objects.hash(maxRetries, firstBackoff, jitterFactor);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("maxRetries", maxRetries)
+ .add("firstBackoff", firstBackoff)
+ .add("jitterFactor", jitterFactor)
+ .toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/james-project/blob/1f390f4f/mailbox/api/src/test/java/org/apache/james/mailbox/events/ErrorHandlingContract.java
----------------------------------------------------------------------
diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/ErrorHandlingContract.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/ErrorHandlingContract.java
index 4b17a6f..3f0cdf7 100644
--- a/mailbox/api/src/test/java/org/apache/james/mailbox/events/ErrorHandlingContract.java
+++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/ErrorHandlingContract.java
@@ -21,6 +21,7 @@ package org.apache.james.mailbox.events;
import static org.apache.james.mailbox.events.EventBusTestFixture.EVENT;
import static org.apache.james.mailbox.events.EventBusTestFixture.NO_KEYS;
+import static org.apache.james.mailbox.events.EventBusTestFixture.WAIT_CONDITION;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.spy;
@@ -86,8 +87,8 @@ interface ErrorHandlingContract extends EventBusContract {
eventBus().register(eventCollector, new EventBusTestFixture.GroupA());
eventBus().dispatch(EVENT, NO_KEYS).block();
- assertThat(eventCollector.getEvents())
- .hasSize(1);
+ WAIT_CONDITION
+ .until(() -> assertThat(eventCollector.getEvents()).hasSize(1));
}
@Test
@@ -103,12 +104,12 @@ interface ErrorHandlingContract extends EventBusContract {
eventBus().register(eventCollector, new EventBusTestFixture.GroupA());
eventBus().dispatch(EVENT, NO_KEYS).block();
- assertThat(eventCollector.getEvents())
- .hasSize(1);
+ WAIT_CONDITION
+ .until(() -> assertThat(eventCollector.getEvents()).hasSize(1));
}
@Test
- default void listenerShouldNotReceiveWhenFailsGreaterThanMaxRetries() {
+ default void listenerShouldNotReceiveWhenFailsGreaterThanMaxRetries() throws Exception {
EventCollector eventCollector = eventCollector();
doThrow(new RuntimeException())
@@ -121,32 +122,50 @@ interface ErrorHandlingContract extends EventBusContract {
eventBus().register(eventCollector, new EventBusTestFixture.GroupA());
eventBus().dispatch(EVENT, NO_KEYS).block();
+ TimeUnit.SECONDS.sleep(1);
assertThat(eventCollector.getEvents())
.isEmpty();
}
@Test
- default void retriesBackOffShouldDelayByExponentialGrowth() {
+ default void exceedingMaxRetriesShouldStopConsumingFailedEvent() throws Exception {
+ ThrowingListener throwingListener = throwingListener();
+
+ eventBus().register(throwingListener, new EventBusTestFixture.GroupA());
+ eventBus().dispatch(EVENT, NO_KEYS).block();
+
+ TimeUnit.SECONDS.sleep(5);
+ int numberOfCallsAfterExceedMaxRetries = throwingListener.timeElapsed.size();
+ TimeUnit.SECONDS.sleep(5);
+
+ assertThat(throwingListener.timeElapsed.size())
+ .isEqualTo(numberOfCallsAfterExceedMaxRetries);
+ }
+
+ @Test
+ default void retriesBackOffShouldDelayByExponentialGrowth() throws Exception {
ThrowingListener throwingListener = throwingListener();
eventBus().register(throwingListener, new EventBusTestFixture.GroupA());
eventBus().dispatch(EVENT, NO_KEYS).block();
+ TimeUnit.SECONDS.sleep(5);
SoftAssertions.assertSoftly(softly -> {
- softly.assertThat(throwingListener.timeElapsed).hasSize(4);
+ List<Instant> timeElapsed = throwingListener.timeElapsed;
+ softly.assertThat(timeElapsed).hasSize(4);
long minFirstDelayAfter = 100; // first backOff
long minSecondDelayAfter = 100; // 200 * jitter factor (200 * 0.5)
long minThirdDelayAfter = 200; // 400 * jitter factor (400 * 0.5)
- softly.assertThat(throwingListener.timeElapsed.get(1))
- .isAfterOrEqualTo(throwingListener.timeElapsed.get(0).plusMillis(minFirstDelayAfter));
+ softly.assertThat(timeElapsed.get(1))
+ .isAfterOrEqualTo(timeElapsed.get(0).plusMillis(minFirstDelayAfter));
- softly.assertThat(throwingListener.timeElapsed.get(2))
- .isAfterOrEqualTo(throwingListener.timeElapsed.get(1).plusMillis(minSecondDelayAfter));
+ softly.assertThat(timeElapsed.get(2))
+ .isAfterOrEqualTo(timeElapsed.get(1).plusMillis(minSecondDelayAfter));
- softly.assertThat(throwingListener.timeElapsed.get(3))
- .isAfterOrEqualTo(throwingListener.timeElapsed.get(2).plusMillis(minThirdDelayAfter));
+ softly.assertThat(timeElapsed.get(3))
+ .isAfterOrEqualTo(timeElapsed.get(2).plusMillis(minThirdDelayAfter));
});
}
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/1f390f4f/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusTestFixture.java
----------------------------------------------------------------------
diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusTestFixture.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusTestFixture.java
index e6920c0..85c798b 100644
--- a/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusTestFixture.java
+++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusTestFixture.java
@@ -37,6 +37,7 @@ import org.apache.james.mailbox.model.TestId;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
+import com.jayway.awaitility.Duration;
import com.jayway.awaitility.core.ConditionFactory;
public interface EventBusTestFixture {
@@ -100,8 +101,7 @@ public interface EventBusTestFixture {
List<Class<? extends Group>> ALL_GROUPS = ImmutableList.of(GroupA.class, GroupB.class, GroupC.class);
GroupA GROUP_A = new GroupA();
-
- ConditionFactory WAIT_CONDITION = await().timeout(com.jayway.awaitility.Duration.ONE_SECOND);
+ ConditionFactory WAIT_CONDITION = await().timeout(Duration.FIVE_SECONDS);
static MailboxListener newListener() {
MailboxListener listener = mock(MailboxListener.class);
http://git-wip-us.apache.org/repos/asf/james-project/blob/1f390f4f/mailbox/api/src/test/java/org/apache/james/mailbox/events/RetryBackoffConfigurationTest.java
----------------------------------------------------------------------
diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/RetryBackoffConfigurationTest.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/RetryBackoffConfigurationTest.java
new file mode 100644
index 0000000..2253cd7
--- /dev/null
+++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/RetryBackoffConfigurationTest.java
@@ -0,0 +1,140 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.mailbox.events;
+
+import static org.apache.james.mailbox.events.RetryBackoffConfiguration.DEFAULT_FIRST_BACKOFF;
+import static org.apache.james.mailbox.events.RetryBackoffConfiguration.DEFAULT_JITTER_FACTOR;
+import static org.apache.james.mailbox.events.RetryBackoffConfiguration.DEFAULT_MAX_RETRIES;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.time.Duration;
+
+import org.assertj.core.api.SoftAssertions;
+import org.junit.jupiter.api.Test;
+
+import nl.jqno.equalsverifier.EqualsVerifier;
+
+class RetryBackoffConfigurationTest {
+
+ @Test
+ void shouldMatchBeanContract() {
+ EqualsVerifier.forClass(RetryBackoffConfiguration.class)
+ .verify();
+ }
+
+ @Test
+ void buildShouldThrowWhenNegativeFirstBackoff() {
+ assertThatThrownBy(() -> RetryBackoffConfiguration.builder()
+ .maxRetries(DEFAULT_MAX_RETRIES)
+ .firstBackoff(Duration.ofMillis(-1000L))
+ .jitterFactor(DEFAULT_JITTER_FACTOR)
+ .build())
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("firstBackoff is not allowed to be negative");
+ }
+
+ @Test
+ void buildShouldThrowWhenNegativeMaxRetries() {
+ assertThatThrownBy(() -> RetryBackoffConfiguration.builder()
+ .maxRetries(-6)
+ .firstBackoff(DEFAULT_FIRST_BACKOFF)
+ .jitterFactor(DEFAULT_JITTER_FACTOR)
+ .build())
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("maxRetries is not allowed to be negative");
+ }
+
+ @Test
+ void buildShouldThrowWhenNegativeJitterFactor() {
+ assertThatThrownBy(() -> RetryBackoffConfiguration.builder()
+ .maxRetries(DEFAULT_MAX_RETRIES)
+ .firstBackoff(DEFAULT_FIRST_BACKOFF)
+ .jitterFactor(-2.5)
+ .build())
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("jitterFactor is not allowed to be negative or greater than 1");
+ }
+
+ @Test
+ void buildShouldThrowWhenGreaterThanOneJitterFactor() {
+ assertThatThrownBy(() -> RetryBackoffConfiguration.builder()
+ .maxRetries(DEFAULT_MAX_RETRIES)
+ .firstBackoff(DEFAULT_FIRST_BACKOFF)
+ .jitterFactor(1.000001)
+ .build())
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("jitterFactor is not allowed to be negative or greater than 1");
+ }
+
+ @Test
+ void buildShouldSuccessWhenZeroFirstBackoff() {
+ RetryBackoffConfiguration retryBackoff = RetryBackoffConfiguration.builder()
+ .maxRetries(DEFAULT_MAX_RETRIES)
+ .firstBackoff(Duration.ZERO)
+ .jitterFactor(DEFAULT_JITTER_FACTOR)
+ .build();
+
+ assertThat(retryBackoff.getFirstBackoff().toMillis())
+ .isEqualTo(0L);
+ }
+
+ @Test
+ void buildShouldSuccessWhenZeroMaxRetries() {
+ RetryBackoffConfiguration retryBackoff = RetryBackoffConfiguration.builder()
+ .maxRetries(0)
+ .firstBackoff(DEFAULT_FIRST_BACKOFF)
+ .jitterFactor(DEFAULT_JITTER_FACTOR)
+ .build();
+
+ assertThat(retryBackoff.getMaxRetries())
+ .isEqualTo(0L);
+ }
+
+ @Test
+ void buildShouldSuccessWhenZeroJitterFactor() {
+ RetryBackoffConfiguration retryBackoff = RetryBackoffConfiguration.builder()
+ .maxRetries(DEFAULT_MAX_RETRIES)
+ .firstBackoff(DEFAULT_FIRST_BACKOFF)
+ .jitterFactor(0)
+ .build();
+
+ assertThat(retryBackoff.getJitterFactor())
+ .isEqualTo(0);
+ }
+ @Test
+ void buildShouldReturnCorrespondingValues() {
+ RetryBackoffConfiguration retryBackoff = RetryBackoffConfiguration.builder()
+ .maxRetries(5)
+ .firstBackoff(Duration.ofMillis(200))
+ .jitterFactor(0.6)
+ .build();
+
+ SoftAssertions.assertSoftly(softly -> {
+ softly.assertThat(retryBackoff.getJitterFactor())
+ .isEqualTo(0.6);
+ softly.assertThat(retryBackoff.getMaxRetries())
+ .isEqualTo(5);
+ softly.assertThat(retryBackoff.getFirstBackoff())
+ .isEqualTo(Duration.ofMillis(200));
+ });
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/james-project/blob/1f390f4f/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupConsumerRetry.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupConsumerRetry.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupConsumerRetry.java
new file mode 100644
index 0000000..04a513a
--- /dev/null
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupConsumerRetry.java
@@ -0,0 +1,133 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.mailbox.events;
+
+import static org.apache.james.backend.rabbitmq.Constants.DIRECT_EXCHANGE;
+import static org.apache.james.backend.rabbitmq.Constants.DURABLE;
+import static org.apache.james.backend.rabbitmq.Constants.EMPTY_ROUTING_KEY;
+import static org.apache.james.mailbox.events.GroupRegistration.RETRY_COUNT;
+import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT;
+
+import org.apache.james.mailbox.Event;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableMap;
+import com.rabbitmq.client.AMQP;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.rabbitmq.BindingSpecification;
+import reactor.rabbitmq.ExchangeSpecification;
+import reactor.rabbitmq.OutboundMessage;
+import reactor.rabbitmq.Sender;
+
+class GroupConsumerRetry {
+
+ static class RetryExchangeName {
+
+ static RetryExchangeName of(Group group) {
+ return new RetryExchangeName(GroupRegistration.groupName(group.getClass()));
+ }
+
+ static final String MAILBOX_EVENT_RETRY_EXCHANGE_PREFIX = MAILBOX_EVENT + "-retryExchange-";
+
+ private final String name;
+
+ private RetryExchangeName(String name) {
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(name), "Exchange name must be specified");
+ this.name = name;
+ }
+
+ String asString() {
+ return MAILBOX_EVENT_RETRY_EXCHANGE_PREFIX + name;
+ }
+ }
+
+ static class RetryPublisher {
+
+ private final Sender sender;
+ private final RetryExchangeName retryExchangeName;
+ private final RetryBackoffConfiguration retryBackoff;
+
+ RetryPublisher(Sender sender, RetryExchangeName retryExchangeName, RetryBackoffConfiguration retryBackoff) {
+ this.sender = sender;
+ this.retryExchangeName = retryExchangeName;
+ this.retryBackoff = retryBackoff;
+ }
+
+ Mono<Void> publish(Event event, byte[] eventAsByte, int currentRetryCount) {
+ return sender.send(createRetryMessage(eventAsByte, currentRetryCount))
+ .doOnError(throwable -> LOGGER.error("Exception happens when publishing event of user {} to retry exchange," +
+ "this event will be lost forever",
+ event.getUser().asString(), throwable));
+ }
+
+ private Mono<OutboundMessage> createRetryMessage(byte[] eventAsByte, int currentRetryCount) {
+ if (currentRetryCount >= retryBackoff.getMaxRetries()) {
+ return Mono.empty(); // will store event to deadletter latter
+ }
+
+ return Mono.just(new OutboundMessage(
+ retryExchangeName.asString(),
+ EMPTY_ROUTING_KEY,
+ new AMQP.BasicProperties.Builder()
+ .headers(ImmutableMap.of(RETRY_COUNT, currentRetryCount + 1))
+ .build(),
+ eventAsByte));
+ }
+ }
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(GroupConsumerRetry.class);
+
+ private final Sender sender;
+ private final GroupRegistration.WorkQueueName queueName;
+ private final RetryExchangeName retryExchangeName;
+ private final RetryPublisher retryPublisher;
+
+ GroupConsumerRetry(Sender sender, GroupRegistration.WorkQueueName queueName, Group group,
+ RetryBackoffConfiguration retryBackoff) {
+ this.sender = sender;
+ this.queueName = queueName;
+ this.retryExchangeName = RetryExchangeName.of(group);
+ this.retryPublisher = new RetryPublisher(sender, retryExchangeName, retryBackoff);
+ }
+
+ Mono<Void> createRetryExchange() {
+ return Flux.concat(
+ sender.declareExchange(ExchangeSpecification.exchange(retryExchangeName.asString())
+ .durable(DURABLE)
+ .type(DIRECT_EXCHANGE)),
+ sender.bind(BindingSpecification.binding()
+ .exchange(retryExchangeName.asString())
+ .queue(queueName.asString())
+ .routingKey(EMPTY_ROUTING_KEY)))
+ .then();
+ }
+
+ Mono<Void> handleRetry(byte[] eventAsBytes, Event event, int currentRetryCount, Throwable throwable) {
+ LOGGER.error("Exception happens when handling event {} of user {}",
+ event.getEventId().getId().toString(), event.getUser().asString(), throwable);
+
+ return retryPublisher.publish(event, eventAsBytes, currentRetryCount);
+ }
+}
http://git-wip-us.apache.org/repos/asf/james-project/blob/1f390f4f/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
index 36db6ae..b382dee 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
@@ -34,20 +34,18 @@ import java.util.Optional;
import org.apache.james.event.json.EventSerializer;
import org.apache.james.mailbox.Event;
import org.apache.james.mailbox.MailboxListener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.rabbitmq.client.Connection;
-import com.rabbitmq.client.Delivery;
-import play.api.libs.json.JsResult;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+import reactor.core.publisher.MonoProcessor;
import reactor.core.scheduler.Schedulers;
+import reactor.rabbitmq.AcknowledgableDelivery;
import reactor.rabbitmq.BindingSpecification;
import reactor.rabbitmq.QueueSpecification;
import reactor.rabbitmq.RabbitFlux;
@@ -56,10 +54,11 @@ import reactor.rabbitmq.ReceiverOptions;
import reactor.rabbitmq.Sender;
class GroupRegistration implements Registration {
+
static class WorkQueueName {
@VisibleForTesting
static WorkQueueName of(Class<? extends Group> clazz) {
- return new WorkQueueName(clazz.getName());
+ return new WorkQueueName(groupName(clazz));
}
static WorkQueueName of(Group group) {
@@ -80,7 +79,12 @@ class GroupRegistration implements Registration {
}
}
- private static final Logger LOGGER = LoggerFactory.getLogger(GroupRegistration.class);
+ static String groupName(Class<? extends Group> clazz) {
+ return clazz.getName();
+ }
+
+ static final String RETRY_COUNT = "retry-count";
+ static final int DEFAULT_RETRY_COUNT = 0;
private final MailboxListener mailboxListener;
private final WorkQueueName queueName;
@@ -88,10 +92,14 @@ class GroupRegistration implements Registration {
private final Runnable unregisterGroup;
private final Sender sender;
private final EventSerializer eventSerializer;
+ private final GroupConsumerRetry retryHandler;
+ private final WaitDelayGenerator delayGenerator;
+ private final RetryBackoffConfiguration retryBackoff;
private Optional<Disposable> receiverSubscriber;
GroupRegistration(Mono<Connection> connectionSupplier, Sender sender, EventSerializer eventSerializer,
- MailboxListener mailboxListener, Group group, Runnable unregisterGroup) {
+ MailboxListener mailboxListener, Group group, RetryBackoffConfiguration retryBackoff,
+ Runnable unregisterGroup) {
this.eventSerializer = eventSerializer;
this.mailboxListener = mailboxListener;
this.queueName = WorkQueueName.of(group);
@@ -99,10 +107,14 @@ class GroupRegistration implements Registration {
this.receiver = RabbitFlux.createReceiver(new ReceiverOptions().connectionMono(connectionSupplier));
this.receiverSubscriber = Optional.empty();
this.unregisterGroup = unregisterGroup;
+ this.retryHandler = new GroupConsumerRetry(sender, queueName, group, retryBackoff);
+ this.retryBackoff = retryBackoff;
+ this.delayGenerator = WaitDelayGenerator.of(retryBackoff);
}
GroupRegistration start() {
createGroupWorkQueue()
+ .then(retryHandler.createRetryExchange())
.doOnSuccess(any -> this.subscribeWorkQueue())
.block();
return this;
@@ -123,23 +135,33 @@ class GroupRegistration implements Registration {
}
private void subscribeWorkQueue() {
- receiverSubscriber = Optional.of(receiver.consumeAutoAck(queueName.asString())
+ receiverSubscriber = Optional.of(receiver.consumeManualAck(queueName.asString())
.subscribeOn(Schedulers.parallel())
- .map(Delivery::getBody)
- .filter(Objects::nonNull)
- .map(eventInBytes -> new String(eventInBytes, StandardCharsets.UTF_8))
- .map(eventSerializer::fromJson)
- .map(JsResult::get)
+ .filter(delivery -> Objects.nonNull(delivery.getBody()))
+ .flatMap(this::deliver)
.subscribeOn(Schedulers.elastic())
- .subscribe(event -> deliverEvent(mailboxListener, event)));
+ .subscribe());
}
- private void deliverEvent(MailboxListener mailboxListener, Event event) {
- try {
- mailboxListener.event(event);
- } catch (Exception e) {
- LOGGER.error("Exception happens when handling event of user {}", event.getUser().asString(), e);
- }
+ private Mono<Void> deliver(AcknowledgableDelivery acknowledgableDelivery) {
+ byte[] eventAsBytes = acknowledgableDelivery.getBody();
+ Event event = eventSerializer.fromJson(new String(eventAsBytes, StandardCharsets.UTF_8)).get();
+ int currentRetryCount = getRetryCount(acknowledgableDelivery);
+
+ return delayGenerator.delayIfHaveTo(currentRetryCount)
+ .flatMap(any -> Mono.fromRunnable(() -> mailboxListener.event(event)))
+ .onErrorResume(throwable -> retryHandler.handleRetry(eventAsBytes, event, currentRetryCount, throwable))
+ .then(Mono.fromRunnable(acknowledgableDelivery::ack))
+ .subscribeWith(MonoProcessor.create())
+ .then();
+ }
+
+ static int getRetryCount(AcknowledgableDelivery acknowledgableDelivery) {
+ return Optional.ofNullable(acknowledgableDelivery.getProperties().getHeaders())
+ .flatMap(headers -> Optional.ofNullable(headers.get(RETRY_COUNT)))
+ .filter(object -> object instanceof Integer)
+ .map(object -> (Integer) object)
+ .orElse(DEFAULT_RETRY_COUNT);
}
@Override
http://git-wip-us.apache.org/repos/asf/james-project/blob/1f390f4f/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistrationHandler.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistrationHandler.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistrationHandler.java
index c0f4339..33ba13b 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistrationHandler.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistrationHandler.java
@@ -35,11 +35,13 @@ class GroupRegistrationHandler {
private final EventSerializer eventSerializer;
private final Sender sender;
private final Mono<Connection> connectionMono;
+ private final RetryBackoffConfiguration retryBackoff;
- GroupRegistrationHandler(EventSerializer eventSerializer, Sender sender, Mono<Connection> connectionMono) {
+ GroupRegistrationHandler(EventSerializer eventSerializer, Sender sender, Mono<Connection> connectionMono, RetryBackoffConfiguration retryBackoff) {
this.eventSerializer = eventSerializer;
this.sender = sender;
this.connectionMono = connectionMono;
+ this.retryBackoff = retryBackoff;
this.groupRegistrations = new ConcurrentHashMap<>();
}
@@ -65,6 +67,7 @@ class GroupRegistrationHandler {
eventSerializer,
listener,
group,
+ retryBackoff,
() -> groupRegistrations.remove(group));
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/james-project/blob/1f390f4f/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
index 447b345..e9e594e 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
@@ -44,24 +44,28 @@ class RabbitMQEventBus implements EventBus {
private final EventSerializer eventSerializer;
private final AtomicBoolean isRunning;
private final RoutingKeyConverter routingKeyConverter;
+ private final RetryBackoffConfiguration retryBackoff;
private GroupRegistrationHandler groupRegistrationHandler;
private KeyRegistrationHandler keyRegistrationHandler;
private EventDispatcher eventDispatcher;
private Sender sender;
- RabbitMQEventBus(RabbitMQConnectionFactory rabbitMQConnectionFactory, EventSerializer eventSerializer, RoutingKeyConverter routingKeyConverter) {
+ RabbitMQEventBus(RabbitMQConnectionFactory rabbitMQConnectionFactory, EventSerializer eventSerializer,
+ RetryBackoffConfiguration retryBackoff,
+ RoutingKeyConverter routingKeyConverter) {
this.connectionMono = Mono.fromSupplier(rabbitMQConnectionFactory::create).cache();
this.eventSerializer = eventSerializer;
this.routingKeyConverter = routingKeyConverter;
+ this.retryBackoff = retryBackoff;
this.isRunning = new AtomicBoolean(false);
}
public void start() {
if (!isRunning.get()) {
sender = RabbitFlux.createSender(new SenderOptions().connectionMono(connectionMono));
- groupRegistrationHandler = new GroupRegistrationHandler(eventSerializer, sender, connectionMono);
keyRegistrationHandler = new KeyRegistrationHandler(eventSerializer, sender, connectionMono, routingKeyConverter);
+ groupRegistrationHandler = new GroupRegistrationHandler(eventSerializer, sender, connectionMono, retryBackoff);
eventDispatcher = new EventDispatcher(eventSerializer, sender);
eventDispatcher.start();
http://git-wip-us.apache.org/repos/asf/james-project/blob/1f390f4f/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/WaitDelayGenerator.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/WaitDelayGenerator.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/WaitDelayGenerator.java
index fdcf9cc..93a20df 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/WaitDelayGenerator.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/WaitDelayGenerator.java
@@ -29,37 +29,39 @@ import reactor.core.publisher.Mono;
class WaitDelayGenerator {
- static WaitDelayGenerator of(RetryBackoff retryBackoff) {
+ static WaitDelayGenerator of(RetryBackoffConfiguration retryBackoff) {
return new WaitDelayGenerator(retryBackoff);
}
private static int randomBetween(int lowest, int highest) {
Preconditions.checkArgument(lowest <= highest, "lowest always has to be less than or equals highest");
+ if (lowest == highest) {
+ return lowest;
+ }
return SECURE_RANDOM.nextInt(highest - lowest) + lowest;
}
private static final SecureRandom SECURE_RANDOM = new SecureRandom();
- private final RetryBackoff retryBackoff;
+ private final RetryBackoffConfiguration retryBackoff;
- private WaitDelayGenerator(RetryBackoff retryBackoff) {
+ private WaitDelayGenerator(RetryBackoffConfiguration retryBackoff) {
this.retryBackoff = retryBackoff;
}
Mono<Integer> delayIfHaveTo(int retryCount) {
Mono<Integer> countRetryMono = Mono.just(retryCount);
- if (retryCount < 1) {
+ if (!shouldDelay(retryCount)) {
return countRetryMono;
}
return countRetryMono
- .filter(count -> count <= retryBackoff.getMaxRetries())
.delayElement(generateDelay(retryCount));
}
@VisibleForTesting
Duration generateDelay(int retryCount) {
- if (retryCount < 1) {
+ if (!shouldDelay(retryCount)) {
return Duration.ZERO;
}
int exponentialFactor = Double.valueOf(Math.pow(2, retryCount - 1)).intValue();
@@ -68,4 +70,8 @@ class WaitDelayGenerator {
return Duration.ofMillis(randomBetween(minDelay, maxDelay));
}
+
+ private boolean shouldDelay(int retryCount) {
+ return retryCount >= 1 && retryCount <= retryBackoff.getMaxRetries();
+ }
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/1f390f4f/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
index dbb125a..ffcdac8 100644
--- a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
+++ b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
@@ -32,6 +32,7 @@ import static org.apache.james.mailbox.events.EventBusTestFixture.GroupA;
import static org.apache.james.mailbox.events.EventBusTestFixture.KEY_1;
import static org.apache.james.mailbox.events.EventBusTestFixture.MailboxListenerCountingSuccessfulExecution;
import static org.apache.james.mailbox.events.EventBusTestFixture.NO_KEYS;
+import static org.apache.james.mailbox.events.EventBusTestFixture.newListener;
import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT;
import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT_EXCHANGE_NAME;
import static org.assertj.core.api.Assertions.assertThat;
@@ -77,7 +78,8 @@ import reactor.rabbitmq.SenderOptions;
class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract, GroupContract.MultipleEventBusGroupContract,
EventBusConcurrentTestContract.SingleEventBusConcurrentContract, EventBusConcurrentTestContract.MultiEventBusConcurrentContract,
- KeyContract.SingleEventBusKeyContract, KeyContract.MultipleEventBusKeyContract {
+ KeyContract.SingleEventBusKeyContract, KeyContract.MultipleEventBusKeyContract,
+ ErrorHandlingContract {
static class RabbitMQEventExtension implements BeforeEachCallback, AfterEachCallback {
static final RabbitMQExtension rabbitMQExtension = new RabbitMQExtension();
@@ -131,9 +133,10 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
eventSerializer = new EventSerializer(mailboxIdFactory, new TestMessageId.Factory());
routingKeyConverter = RoutingKeyConverter.forFactories(new MailboxIdRegistrationKey.Factory(mailboxIdFactory));
- eventBus = new RabbitMQEventBus(connectionFactory, eventSerializer, routingKeyConverter);
- eventBus2 = new RabbitMQEventBus(connectionFactory, eventSerializer, routingKeyConverter);
- eventBus3 = new RabbitMQEventBus(connectionFactory, eventSerializer, routingKeyConverter);
+ eventBus = newEventBus();
+ eventBus2 = newEventBus();
+ eventBus3 = newEventBus();
+
eventBus.start();
eventBus2.start();
eventBus3.start();
@@ -152,6 +155,10 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
sender.close();
}
+ private RabbitMQEventBus newEventBus() {
+ return new RabbitMQEventBus(connectionFactory, eventSerializer, RetryBackoffConfiguration.DEFAULT, routingKeyConverter);
+ }
+
@Override
public EventBus eventBus() {
return eventBus;
@@ -181,6 +188,17 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
}
+ @Test
+ void registerGroupShouldCreateRetryExchange() throws Exception {
+ MailboxListener listener = newListener();
+ EventBusTestFixture.GroupA registeredGroup = new EventBusTestFixture.GroupA();
+ eventBus.register(listener, registeredGroup);
+
+ GroupConsumerRetry.RetryExchangeName retryExchangeName = GroupConsumerRetry.RetryExchangeName.of(registeredGroup);
+ assertThat(testExtension.rabbitMQExtension.managementAPI().listExchanges())
+ .anyMatch(exchange -> exchange.getName().equals(retryExchangeName.asString()));
+ }
+
@Nested
class PublishingTest {
private static final String MAILBOX_WORK_QUEUE_NAME = MAILBOX_EVENT + "-workQueue";
http://git-wip-us.apache.org/repos/asf/james-project/blob/1f390f4f/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/WaitDelayGeneratorTest.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/WaitDelayGeneratorTest.java b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/WaitDelayGeneratorTest.java
index acc3a9f..9cb2d62 100644
--- a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/WaitDelayGeneratorTest.java
+++ b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/WaitDelayGeneratorTest.java
@@ -30,7 +30,7 @@ class WaitDelayGeneratorTest {
@Test
void generateDelayShouldReturnZeroWhenZeroRetryCount() {
- WaitDelayGenerator generator = WaitDelayGenerator.of(RetryBackoff.defaultRetryBackoff());
+ WaitDelayGenerator generator = WaitDelayGenerator.of(RetryBackoffConfiguration.DEFAULT);
assertThat(generator.generateDelay(0))
.isEqualTo(Duration.ofMillis(0));
@@ -38,7 +38,7 @@ class WaitDelayGeneratorTest {
@Test
void generateDelayShouldReturnByRandomInRangeOfExponentialGrowthOfRetryCount() {
- WaitDelayGenerator generator = WaitDelayGenerator.of(RetryBackoff.builder()
+ WaitDelayGenerator generator = WaitDelayGenerator.of(RetryBackoffConfiguration.builder()
.maxRetries(4)
.firstBackoff(Duration.ofMillis(100))
.jitterFactor(0.5)
@@ -55,4 +55,49 @@ class WaitDelayGeneratorTest {
.isBetween(800L, 1200L);
});
}
+
+ @Test
+ void generateDelayShouldReturnZeroWhenZeroMaxRetries() {
+ WaitDelayGenerator generator = WaitDelayGenerator.of(RetryBackoffConfiguration.builder()
+ .maxRetries(0)
+ .firstBackoff(Duration.ofMillis(1000))
+ .jitterFactor(0.5)
+ .build());
+
+ SoftAssertions.assertSoftly(softly -> {
+ softly.assertThat(generator.generateDelay(1)).isEqualTo(Duration.ZERO);
+ softly.assertThat(generator.generateDelay(2)).isEqualTo(Duration.ZERO);
+ softly.assertThat(generator.generateDelay(3)).isEqualTo(Duration.ZERO);
+ });
+ }
+
+ @Test
+ void generateDelayShouldReturnZeroWhenZeroFirstBackOff() {
+ WaitDelayGenerator generator = WaitDelayGenerator.of(RetryBackoffConfiguration.builder()
+ .maxRetries(3)
+ .firstBackoff(Duration.ZERO)
+ .jitterFactor(0.5)
+ .build());
+
+ SoftAssertions.assertSoftly(softly -> {
+ softly.assertThat(generator.generateDelay(1)).isEqualTo(Duration.ZERO);
+ softly.assertThat(generator.generateDelay(2)).isEqualTo(Duration.ZERO);
+ softly.assertThat(generator.generateDelay(3)).isEqualTo(Duration.ZERO);
+ });
+ }
+
+ @Test
+ void generateDelayShouldReturnFloorOfExponentialGrowthStepsWhenZeroJitterFactor() {
+ WaitDelayGenerator generator = WaitDelayGenerator.of(RetryBackoffConfiguration.builder()
+ .maxRetries(3)
+ .firstBackoff(Duration.ofMillis(100))
+ .jitterFactor(0.0)
+ .build());
+
+ SoftAssertions.assertSoftly(softly -> {
+ softly.assertThat(generator.generateDelay(1)).isEqualTo(Duration.ofMillis(100));
+ softly.assertThat(generator.generateDelay(2)).isEqualTo(Duration.ofMillis(200));
+ softly.assertThat(generator.generateDelay(3)).isEqualTo(Duration.ofMillis(400));
+ });
+ }
}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org