You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2023/03/07 02:34:04 UTC

[james-project] branch 3.7.x updated: JAMES-3892 Allow configuring the count of retries in LocalDelivery [BACKPORT] (#1469)

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch 3.7.x
in repository https://gitbox.apache.org/repos/asf/james-project.git


The following commit(s) were added to refs/heads/3.7.x by this push:
     new fd3812c0bc JAMES-3892 Allow configuring the count of retries in LocalDelivery [BACKPORT] (#1469)
fd3812c0bc is described below

commit fd3812c0bc7516ad709a5d0918d77e2cc4e8a9dd
Author: Benoit TELLIER <bt...@linagora.com>
AuthorDate: Tue Mar 7 09:33:58 2023 +0700

    JAMES-3892 Allow configuring the count of retries in LocalDelivery [BACKPORT] (#1469)
---
 .../java/org/apache/mailet/base/MailetUtil.java    | 28 +++++----
 .../james/transport/mailets/LocalDelivery.java     |  4 ++
 .../transport/mailets/delivery/MailDispatcher.java | 31 +++++++---
 .../mailets/delivery/MailDispatcherTest.java       | 66 ++++++++++++++++++++++
 4 files changed, 110 insertions(+), 19 deletions(-)

diff --git a/mailet/base/src/main/java/org/apache/mailet/base/MailetUtil.java b/mailet/base/src/main/java/org/apache/mailet/base/MailetUtil.java
index c6b33626c5..1c19c2d249 100644
--- a/mailet/base/src/main/java/org/apache/mailet/base/MailetUtil.java
+++ b/mailet/base/src/main/java/org/apache/mailet/base/MailetUtil.java
@@ -25,6 +25,7 @@ import javax.mail.MessagingException;
 
 import org.apache.mailet.MailetConfig;
 
+import com.github.fge.lambdas.Throwing;
 import com.google.common.base.Strings;
 
 /**
@@ -52,30 +53,33 @@ public class MailetUtil {
     }
 
     public static int getInitParameterAsStrictlyPositiveInteger(String condition, int defaultValue) throws MessagingException {
-        String defaultStringValue = String.valueOf(defaultValue);
-        return getInitParameterAsStrictlyPositiveInteger(condition, Optional.of(defaultStringValue));
+        return getInitParameterAsStrictlyPositiveInteger(condition, Optional.of(defaultValue));
     }
 
     public static int getInitParameterAsStrictlyPositiveInteger(String condition) throws MessagingException {
         return getInitParameterAsStrictlyPositiveInteger(condition, Optional.empty());
     }
 
-    private static int getInitParameterAsStrictlyPositiveInteger(String condition, Optional<String> defaultValue) throws MessagingException {
-        String value = Optional.ofNullable(condition)
-            .orElse(defaultValue.orElse(null));
-
-        if (Strings.isNullOrEmpty(value)) {
-            throw new MessagingException("Condition is required. It should be a strictly positive integer");
-        }
-
-        int valueAsInt = tryParseInteger(value);
+    private static int getInitParameterAsStrictlyPositiveInteger(String condition, Optional<Integer> defaultValue) throws MessagingException {
+        int valueAsInt = getInitParameterAsInteger(condition, defaultValue);
 
         if (valueAsInt < 1) {
-            throw new MessagingException("Expecting condition to be a strictly positive integer. Got " + value);
+            throw new MessagingException("Expecting condition to be a strictly positive integer. Got " + valueAsInt);
         }
         return valueAsInt;
     }
 
+    public static int getInitParameterAsInteger(String condition, Optional<Integer> defaultValue) throws MessagingException {
+        if (Strings.isNullOrEmpty(condition) && defaultValue.isEmpty()) {
+            throw new MessagingException("Condition is required. It should be a strictly positive integer");
+        }
+
+        return Optional.ofNullable(condition)
+            .map(Throwing.<String, Integer>function(MailetUtil::tryParseInteger).sneakyThrow())
+            .or(() -> defaultValue)
+            .get();
+    }
+
     private static int tryParseInteger(String value) throws MessagingException {
         try {
             return Integer.parseInt(value);
diff --git a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/LocalDelivery.java b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/LocalDelivery.java
index b6ac485359..4f79ed58db 100644
--- a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/LocalDelivery.java
+++ b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/LocalDelivery.java
@@ -19,6 +19,8 @@
 
 package org.apache.james.transport.mailets;
 
+import java.util.Optional;
+
 import javax.inject.Inject;
 import javax.inject.Named;
 import javax.mail.MessagingException;
@@ -32,6 +34,7 @@ import org.apache.james.transport.mailets.delivery.SimpleMailStore;
 import org.apache.james.user.api.UsersRepository;
 import org.apache.mailet.Mail;
 import org.apache.mailet.base.GenericMailet;
+import org.apache.mailet.base.MailetUtil;
 
 /**
  * Receives a Mail from the Queue and takes care of delivery of the
@@ -77,6 +80,7 @@ public class LocalDelivery extends GenericMailet {
                 .metric(metricFactory.generate(LOCAL_DELIVERED_MAILS_METRIC_NAME))
                 .build())
             .consume(getInitParameter("consume", true))
+            .retries(MailetUtil.getInitParameterAsInteger(getInitParameter("retries"), Optional.of(MailDispatcher.RETRIES)))
             .mailetContext(getMailetContext())
             .build();
     }
diff --git a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/delivery/MailDispatcher.java b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/delivery/MailDispatcher.java
index b8cb379f57..f5e1474493 100644
--- a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/delivery/MailDispatcher.java
+++ b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/delivery/MailDispatcher.java
@@ -23,6 +23,7 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.mail.MessagingException;
 import javax.mail.internet.MimeMessage;
@@ -50,7 +51,7 @@ import reactor.util.retry.Retry;
 public class MailDispatcher {
     private static final Logger LOGGER = LoggerFactory.getLogger(MailDispatcher.class);
     private static final String[] NO_HEADERS = {};
-    private static final int RETRIES = 3;
+    public static final int RETRIES = 3;
     private static final Duration FIRST_BACKOFF = Duration.ofMillis(200);
     private static final Duration MAX_BACKOFF = Duration.ofSeconds(1);
 
@@ -63,6 +64,7 @@ public class MailDispatcher {
         private MailStore mailStore;
         private Optional<Boolean> consume = Optional.empty();
         private MailetContext mailetContext;
+        private Optional<Integer> retries = Optional.empty();
 
         public Builder consume(boolean consume) {
             this.consume = Optional.of(consume);
@@ -79,10 +81,17 @@ public class MailDispatcher {
             return this;
         }
 
+        public Builder retries(int retries) {
+            if (retries > 0) {
+                this.retries = Optional.of(retries);
+            }
+            return this;
+        }
+
         public MailDispatcher build() {
             Preconditions.checkNotNull(mailStore);
             Preconditions.checkNotNull(mailetContext);
-            return new MailDispatcher(mailStore, consume.orElse(CONSUME), mailetContext);
+            return new MailDispatcher(mailStore, consume.orElse(CONSUME), mailetContext, retries);
         }
 
     }
@@ -90,11 +99,13 @@ public class MailDispatcher {
     private final MailStore mailStore;
     private final boolean consume;
     private final MailetContext mailetContext;
+    private final Optional<Integer> retries;
 
-    private MailDispatcher(MailStore mailStore, boolean consume, MailetContext mailetContext) {
+    private MailDispatcher(MailStore mailStore, boolean consume, MailetContext mailetContext, Optional<Integer> retries) {
         this.mailStore = mailStore;
         this.consume = consume;
         this.mailetContext = mailetContext;
+        this.retries = retries;
     }
 
     public void dispatch(Mail mail) throws MessagingException {
@@ -152,10 +163,16 @@ public class MailDispatcher {
     }
 
     private Mono<Void> storeMailWithRetry(Mail mail, MailAddress recipient) {
-       return Mono.from(mailStore.storeMail(recipient, mail))
-           .doOnError(error -> LOGGER.warn("Error While storing mail. This error will be retried.", error))
-           .retryWhen(Retry.backoff(RETRIES, FIRST_BACKOFF).maxBackoff(MAX_BACKOFF).scheduler(Schedulers.elastic()))
-           .then();
+        AtomicInteger remainRetries = new AtomicInteger(retries.orElse(0));
+
+        Mono<Void> operation = Mono.from(mailStore.storeMail(recipient, mail))
+            .doOnError(error -> LOGGER.warn("Error While storing mail. This error will be retried for {} more times.", remainRetries.getAndDecrement(), error));
+
+        return retries.map(count ->
+            operation
+                .retryWhen(Retry.backoff(count, FIRST_BACKOFF).maxBackoff(MAX_BACKOFF).scheduler(Schedulers.parallel()))
+                .then())
+            .orElse(operation);
     }
 
     private Map<String, List<String>> saveHeaders(Mail mail, MailAddress recipient) throws MessagingException {
diff --git a/server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/delivery/MailDispatcherTest.java b/server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/delivery/MailDispatcherTest.java
index d00e156ad6..60bef7e86b 100644
--- a/server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/delivery/MailDispatcherTest.java
+++ b/server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/delivery/MailDispatcherTest.java
@@ -21,6 +21,7 @@ package org.apache.james.transport.mailets.delivery;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
@@ -30,6 +31,7 @@ import static org.mockito.Mockito.when;
 import java.nio.charset.StandardCharsets;
 import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.mail.MessagingException;
 
@@ -46,6 +48,8 @@ import org.apache.mailet.base.test.FakeMailContext;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.mockito.ArgumentCaptor;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 import org.reactivestreams.Publisher;
 
 import com.github.fge.lambdas.Throwing;
@@ -92,6 +96,68 @@ class MailDispatcherTest {
         verifyNoMoreInteractions(mailStore);
     }
 
+    @Test
+    void dispatchShouldPerformRetries() throws Exception {
+        MailDispatcher testee = MailDispatcher.builder()
+            .mailetContext(fakeMailContext)
+            .retries(3)
+            .mailStore(mailStore)
+            .consume(true)
+            .build();
+
+        AtomicInteger counter = new AtomicInteger(0);
+        when(mailStore.storeMail(any(), any())).thenAnswer(invocationOnMock -> Mono.error(() -> {
+            counter.getAndIncrement();
+            return new RuntimeException();
+        }));
+
+        FakeMail mail = FakeMail.builder()
+            .name("name")
+            .sender(MailAddressFixture.OTHER_AT_JAMES)
+            .recipients(MailAddressFixture.ANY_AT_JAMES)
+            .state("state")
+            .mimeMessage(MimeMessageUtil.defaultMimeMessage())
+            .build();
+        try {
+            testee.dispatch(mail);
+        } catch (Exception e) {
+            // ignore
+        }
+
+        assertThat(counter.get()).isEqualTo(4);
+    }
+
+    @Test
+    void disableRetries() throws Exception {
+        MailDispatcher testee = MailDispatcher.builder()
+            .mailetContext(fakeMailContext)
+            .retries(0)
+            .mailStore(mailStore)
+            .consume(true)
+            .build();
+
+        AtomicInteger counter = new AtomicInteger(0);
+        when(mailStore.storeMail(any(), any())).thenAnswer(invocationOnMock -> Mono.error(() -> {
+            counter.getAndIncrement();
+            return new RuntimeException();
+        }));
+
+        FakeMail mail = FakeMail.builder()
+            .name("name")
+            .sender(MailAddressFixture.OTHER_AT_JAMES)
+            .recipients(MailAddressFixture.ANY_AT_JAMES)
+            .state("state")
+            .mimeMessage(MimeMessageUtil.defaultMimeMessage())
+            .build();
+        try {
+            testee.dispatch(mail);
+        } catch (Exception e) {
+            // ignore
+        }
+
+        assertThat(counter.get()).isEqualTo(1);
+    }
+
     @Test
     void dispatchShouldConsumeMailIfSpecified() throws Exception {
         MailDispatcher testee = MailDispatcher.builder()


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org