You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2023/03/07 02:34:04 UTC
[james-project] branch 3.7.x updated: JAMES-3892 Allow configuring the count of retries in LocalDelivery [BACKPORT] (#1469)
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch 3.7.x
in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/3.7.x by this push:
new fd3812c0bc JAMES-3892 Allow configuring the count of retries in LocalDelivery [BACKPORT] (#1469)
fd3812c0bc is described below
commit fd3812c0bc7516ad709a5d0918d77e2cc4e8a9dd
Author: Benoit TELLIER <bt...@linagora.com>
AuthorDate: Tue Mar 7 09:33:58 2023 +0700
JAMES-3892 Allow configuring the count of retries in LocalDelivery [BACKPORT] (#1469)
---
.../java/org/apache/mailet/base/MailetUtil.java | 28 +++++----
.../james/transport/mailets/LocalDelivery.java | 4 ++
.../transport/mailets/delivery/MailDispatcher.java | 31 +++++++---
.../mailets/delivery/MailDispatcherTest.java | 66 ++++++++++++++++++++++
4 files changed, 110 insertions(+), 19 deletions(-)
diff --git a/mailet/base/src/main/java/org/apache/mailet/base/MailetUtil.java b/mailet/base/src/main/java/org/apache/mailet/base/MailetUtil.java
index c6b33626c5..1c19c2d249 100644
--- a/mailet/base/src/main/java/org/apache/mailet/base/MailetUtil.java
+++ b/mailet/base/src/main/java/org/apache/mailet/base/MailetUtil.java
@@ -25,6 +25,7 @@ import javax.mail.MessagingException;
import org.apache.mailet.MailetConfig;
+import com.github.fge.lambdas.Throwing;
import com.google.common.base.Strings;
/**
@@ -52,30 +53,33 @@ public class MailetUtil {
}
public static int getInitParameterAsStrictlyPositiveInteger(String condition, int defaultValue) throws MessagingException {
- String defaultStringValue = String.valueOf(defaultValue);
- return getInitParameterAsStrictlyPositiveInteger(condition, Optional.of(defaultStringValue));
+ return getInitParameterAsStrictlyPositiveInteger(condition, Optional.of(defaultValue));
}
public static int getInitParameterAsStrictlyPositiveInteger(String condition) throws MessagingException {
return getInitParameterAsStrictlyPositiveInteger(condition, Optional.empty());
}
- private static int getInitParameterAsStrictlyPositiveInteger(String condition, Optional<String> defaultValue) throws MessagingException {
- String value = Optional.ofNullable(condition)
- .orElse(defaultValue.orElse(null));
-
- if (Strings.isNullOrEmpty(value)) {
- throw new MessagingException("Condition is required. It should be a strictly positive integer");
- }
-
- int valueAsInt = tryParseInteger(value);
+ private static int getInitParameterAsStrictlyPositiveInteger(String condition, Optional<Integer> defaultValue) throws MessagingException {
+ int valueAsInt = getInitParameterAsInteger(condition, defaultValue);
if (valueAsInt < 1) {
- throw new MessagingException("Expecting condition to be a strictly positive integer. Got " + value);
+ throw new MessagingException("Expecting condition to be a strictly positive integer. Got " + valueAsInt);
}
return valueAsInt;
}
+ public static int getInitParameterAsInteger(String condition, Optional<Integer> defaultValue) throws MessagingException {
+ if (Strings.isNullOrEmpty(condition) && defaultValue.isEmpty()) {
+ throw new MessagingException("Condition is required. It should be a strictly positive integer");
+ }
+
+ return Optional.ofNullable(condition)
+ .map(Throwing.<String, Integer>function(MailetUtil::tryParseInteger).sneakyThrow())
+ .or(() -> defaultValue)
+ .get();
+ }
+
private static int tryParseInteger(String value) throws MessagingException {
try {
return Integer.parseInt(value);
diff --git a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/LocalDelivery.java b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/LocalDelivery.java
index b6ac485359..4f79ed58db 100644
--- a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/LocalDelivery.java
+++ b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/LocalDelivery.java
@@ -19,6 +19,8 @@
package org.apache.james.transport.mailets;
+import java.util.Optional;
+
import javax.inject.Inject;
import javax.inject.Named;
import javax.mail.MessagingException;
@@ -32,6 +34,7 @@ import org.apache.james.transport.mailets.delivery.SimpleMailStore;
import org.apache.james.user.api.UsersRepository;
import org.apache.mailet.Mail;
import org.apache.mailet.base.GenericMailet;
+import org.apache.mailet.base.MailetUtil;
/**
* Receives a Mail from the Queue and takes care of delivery of the
@@ -77,6 +80,7 @@ public class LocalDelivery extends GenericMailet {
.metric(metricFactory.generate(LOCAL_DELIVERED_MAILS_METRIC_NAME))
.build())
.consume(getInitParameter("consume", true))
+ .retries(MailetUtil.getInitParameterAsInteger(getInitParameter("retries"), Optional.of(MailDispatcher.RETRIES)))
.mailetContext(getMailetContext())
.build();
}
diff --git a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/delivery/MailDispatcher.java b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/delivery/MailDispatcher.java
index b8cb379f57..f5e1474493 100644
--- a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/delivery/MailDispatcher.java
+++ b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/delivery/MailDispatcher.java
@@ -23,6 +23,7 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.mail.MessagingException;
import javax.mail.internet.MimeMessage;
@@ -50,7 +51,7 @@ import reactor.util.retry.Retry;
public class MailDispatcher {
private static final Logger LOGGER = LoggerFactory.getLogger(MailDispatcher.class);
private static final String[] NO_HEADERS = {};
- private static final int RETRIES = 3;
+ public static final int RETRIES = 3;
private static final Duration FIRST_BACKOFF = Duration.ofMillis(200);
private static final Duration MAX_BACKOFF = Duration.ofSeconds(1);
@@ -63,6 +64,7 @@ public class MailDispatcher {
private MailStore mailStore;
private Optional<Boolean> consume = Optional.empty();
private MailetContext mailetContext;
+ private Optional<Integer> retries = Optional.empty();
public Builder consume(boolean consume) {
this.consume = Optional.of(consume);
@@ -79,10 +81,17 @@ public class MailDispatcher {
return this;
}
+ public Builder retries(int retries) {
+ if (retries > 0) {
+ this.retries = Optional.of(retries);
+ }
+ return this;
+ }
+
public MailDispatcher build() {
Preconditions.checkNotNull(mailStore);
Preconditions.checkNotNull(mailetContext);
- return new MailDispatcher(mailStore, consume.orElse(CONSUME), mailetContext);
+ return new MailDispatcher(mailStore, consume.orElse(CONSUME), mailetContext, retries);
}
}
@@ -90,11 +99,13 @@ public class MailDispatcher {
private final MailStore mailStore;
private final boolean consume;
private final MailetContext mailetContext;
+ private final Optional<Integer> retries;
- private MailDispatcher(MailStore mailStore, boolean consume, MailetContext mailetContext) {
+ private MailDispatcher(MailStore mailStore, boolean consume, MailetContext mailetContext, Optional<Integer> retries) {
this.mailStore = mailStore;
this.consume = consume;
this.mailetContext = mailetContext;
+ this.retries = retries;
}
public void dispatch(Mail mail) throws MessagingException {
@@ -152,10 +163,16 @@ public class MailDispatcher {
}
private Mono<Void> storeMailWithRetry(Mail mail, MailAddress recipient) {
- return Mono.from(mailStore.storeMail(recipient, mail))
- .doOnError(error -> LOGGER.warn("Error While storing mail. This error will be retried.", error))
- .retryWhen(Retry.backoff(RETRIES, FIRST_BACKOFF).maxBackoff(MAX_BACKOFF).scheduler(Schedulers.elastic()))
- .then();
+ AtomicInteger remainRetries = new AtomicInteger(retries.orElse(0));
+
+ Mono<Void> operation = Mono.from(mailStore.storeMail(recipient, mail))
+ .doOnError(error -> LOGGER.warn("Error While storing mail. This error will be retried for {} more times.", remainRetries.getAndDecrement(), error));
+
+ return retries.map(count ->
+ operation
+ .retryWhen(Retry.backoff(count, FIRST_BACKOFF).maxBackoff(MAX_BACKOFF).scheduler(Schedulers.parallel()))
+ .then())
+ .orElse(operation);
}
private Map<String, List<String>> saveHeaders(Mail mail, MailAddress recipient) throws MessagingException {
diff --git a/server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/delivery/MailDispatcherTest.java b/server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/delivery/MailDispatcherTest.java
index d00e156ad6..60bef7e86b 100644
--- a/server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/delivery/MailDispatcherTest.java
+++ b/server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/delivery/MailDispatcherTest.java
@@ -21,6 +21,7 @@ package org.apache.james.transport.mailets.delivery;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
@@ -30,6 +31,7 @@ import static org.mockito.Mockito.when;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.mail.MessagingException;
@@ -46,6 +48,8 @@ import org.apache.mailet.base.test.FakeMailContext;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
import org.reactivestreams.Publisher;
import com.github.fge.lambdas.Throwing;
@@ -92,6 +96,68 @@ class MailDispatcherTest {
verifyNoMoreInteractions(mailStore);
}
+ @Test
+ void dispatchShouldPerformRetries() throws Exception {
+ MailDispatcher testee = MailDispatcher.builder()
+ .mailetContext(fakeMailContext)
+ .retries(3)
+ .mailStore(mailStore)
+ .consume(true)
+ .build();
+
+ AtomicInteger counter = new AtomicInteger(0);
+ when(mailStore.storeMail(any(), any())).thenAnswer(invocationOnMock -> Mono.error(() -> {
+ counter.getAndIncrement();
+ return new RuntimeException();
+ }));
+
+ FakeMail mail = FakeMail.builder()
+ .name("name")
+ .sender(MailAddressFixture.OTHER_AT_JAMES)
+ .recipients(MailAddressFixture.ANY_AT_JAMES)
+ .state("state")
+ .mimeMessage(MimeMessageUtil.defaultMimeMessage())
+ .build();
+ try {
+ testee.dispatch(mail);
+ } catch (Exception e) {
+ // ignore
+ }
+
+ assertThat(counter.get()).isEqualTo(4);
+ }
+
+ @Test
+ void disableRetries() throws Exception {
+ MailDispatcher testee = MailDispatcher.builder()
+ .mailetContext(fakeMailContext)
+ .retries(0)
+ .mailStore(mailStore)
+ .consume(true)
+ .build();
+
+ AtomicInteger counter = new AtomicInteger(0);
+ when(mailStore.storeMail(any(), any())).thenAnswer(invocationOnMock -> Mono.error(() -> {
+ counter.getAndIncrement();
+ return new RuntimeException();
+ }));
+
+ FakeMail mail = FakeMail.builder()
+ .name("name")
+ .sender(MailAddressFixture.OTHER_AT_JAMES)
+ .recipients(MailAddressFixture.ANY_AT_JAMES)
+ .state("state")
+ .mimeMessage(MimeMessageUtil.defaultMimeMessage())
+ .build();
+ try {
+ testee.dispatch(mail);
+ } catch (Exception e) {
+ // ignore
+ }
+
+ assertThat(counter.get()).isEqualTo(1);
+ }
+
@Test
void dispatchShouldConsumeMailIfSpecified() throws Exception {
MailDispatcher testee = MailDispatcher.builder()
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org