You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ro...@apache.org on 2019/06/24 09:09:46 UTC

[james-project] 01/04: JAMES-2801 use elastic Scheduler by default for retry and delay

This is an automated email from the ASF dual-hosted git repository.

rouazana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit fa54f12fea2d7390419baa813f47ed6bcfa62f0f
Author: Matthieu Baechler <ma...@apache.org>
AuthorDate: Thu Jun 20 17:23:24 2019 +0200

    JAMES-2801 use elastic Scheduler by default for retry and delay
---
 .../org/apache/james/backend/rabbitmq/RabbitMQConnectionFactory.java | 4 ++--
 .../java/org/apache/james/backend/rabbitmq/SimpleChannelPool.java    | 4 ++--
 .../java/org/apache/james/backend/rabbitmq/SimpleConnectionPool.java | 4 ++--
 .../apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java | 4 +++-
 .../java/org/apache/james/mailbox/events/delivery/EventDelivery.java | 5 +++--
 .../java/org/apache/james/mailbox/events/WaitDelayGenerator.java     | 3 ++-
 .../org/apache/james/modules/mailbox/ElasticSearchClientModule.java  | 3 ++-
 .../org/apache/james/modules/mailbox/ResilientClusterProvider.java   | 3 ++-
 8 files changed, 18 insertions(+), 12 deletions(-)

diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQConnectionFactory.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQConnectionFactory.java
index 4c67175..d733914 100644
--- a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQConnectionFactory.java
+++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQConnectionFactory.java
@@ -58,8 +58,8 @@ public class RabbitMQConnectionFactory {
     }
 
     Mono<Connection> connectionMono() {
+        Duration forever = Duration.ofMillis(Long.MAX_VALUE);
         return Mono.fromCallable(connectionFactory::newConnection)
-            .retryBackoff(configuration.getMaxRetries(), Duration.ofMillis(configuration.getMinDelayInMs()))
-            .publishOn(Schedulers.elastic());
+            .retryBackoff(configuration.getMaxRetries(), Duration.ofMillis(configuration.getMinDelayInMs()), forever, Schedulers.elastic());
     }
 }
diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/SimpleChannelPool.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/SimpleChannelPool.java
index 2a7da85..16a8a7f 100644
--- a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/SimpleChannelPool.java
+++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/SimpleChannelPool.java
@@ -85,9 +85,9 @@ public class SimpleChannelPool implements RabbitMQChannelPool {
     private Mono<Channel> getResilientChannel() {
         int numRetries = 100;
         Duration initialDelay = Duration.ofMillis(100);
+        Duration forever = Duration.ofMillis(Long.MAX_VALUE);
         return Mono.defer(this::getOpenChannel)
-            .publishOn(Schedulers.elastic())
-            .retryBackoff(numRetries, initialDelay);
+            .retryBackoff(numRetries, initialDelay, forever, Schedulers.elastic());
     }
 
     private Mono<Channel> getOpenChannel() {
diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/SimpleConnectionPool.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/SimpleConnectionPool.java
index 519c1fa..5f07719 100644
--- a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/SimpleConnectionPool.java
+++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/SimpleConnectionPool.java
@@ -55,9 +55,9 @@ public class SimpleConnectionPool implements AutoCloseable {
     public Mono<Connection> getResilientConnection() {
         int numRetries = 100;
         Duration initialDelay = Duration.ofMillis(100);
+        Duration forever = Duration.ofMillis(Long.MAX_VALUE);
         return Mono.defer(this::getOpenConnection)
-            .subscribeOn(Schedulers.elastic())
-            .retryBackoff(numRetries, initialDelay);
+            .retryBackoff(numRetries, initialDelay, forever, Schedulers.elastic());
     }
 
     private Mono<Connection> getOpenConnection() {
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java
index c173392..02a7adc 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java
@@ -51,6 +51,7 @@ import com.datastax.driver.core.Session;
 import com.google.common.base.MoreObjects;
 
 import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
 
 public class CassandraModSeqProvider implements ModSeqProvider {
 
@@ -186,9 +187,10 @@ public class CassandraModSeqProvider implements ModSeqProvider {
     }
 
     private Mono<ModSeq> handleRetries(CassandraId mailboxId) {
+        Duration forever = Duration.ofMillis(Long.MAX_VALUE);
         return tryFindThenUpdateOnce(mailboxId)
             .single()
-            .retryBackoff(maxModSeqRetries, Duration.ofMillis(2));
+            .retryBackoff(maxModSeqRetries, Duration.ofMillis(2), forever, Schedulers.elastic());
     }
 
     private Mono<ModSeq> tryFindThenUpdateOnce(CassandraId mailboxId) {
diff --git a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDelivery.java b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDelivery.java
index e7fbcc1..7b2fb67 100644
--- a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDelivery.java
+++ b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDelivery.java
@@ -34,6 +34,7 @@ import org.slf4j.LoggerFactory;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
 
 public interface EventDelivery {
 
@@ -75,7 +76,7 @@ public interface EventDelivery {
             }
 
             private static final Logger LOGGER = LoggerFactory.getLogger(BackoffRetryer.class);
-            private static final Duration MAX_BACKOFF = Duration.ofMillis(Long.MAX_VALUE);
+            private static final Duration FOREVER = Duration.ofMillis(Long.MAX_VALUE);
 
             private final RetryBackoffConfiguration retryBackoff;
             private final MailboxListener mailboxListener;
@@ -88,7 +89,7 @@ public interface EventDelivery {
             @Override
             public Mono<Void> doRetry(Mono<Void> executionResult, Event event) {
                 return executionResult
-                    .retryBackoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff(), MAX_BACKOFF, retryBackoff.getJitterFactor())
+                    .retryBackoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff(), FOREVER, retryBackoff.getJitterFactor(), Schedulers.elastic())
                     .doOnError(throwable -> LOGGER.error("listener {} exceeded maximum retry({}) to handle event {}",
                         mailboxListener.getClass().getCanonicalName(),
                         retryBackoff.getMaxRetries(),
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/WaitDelayGenerator.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/WaitDelayGenerator.java
index 93a20df..ff0378b 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/WaitDelayGenerator.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/WaitDelayGenerator.java
@@ -26,6 +26,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
 import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
 
 class WaitDelayGenerator {
 
@@ -56,7 +57,7 @@ class WaitDelayGenerator {
         }
 
         return countRetryMono
-            .delayElement(generateDelay(retryCount));
+            .delayElement(generateDelay(retryCount), Schedulers.elastic());
     }
 
     @VisibleForTesting
diff --git a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchClientModule.java b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchClientModule.java
index 96350a1..e485fee 100644
--- a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchClientModule.java
+++ b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchClientModule.java
@@ -50,10 +50,11 @@ public class ElasticSearchClientModule extends AbstractModule {
     @Singleton
     protected RestHighLevelClient provideClient(ElasticSearchConfiguration configuration) {
         Duration waitDelay = Duration.ofMillis(configuration.getMinDelay());
+        Duration forever = Duration.ofMillis(Long.MAX_VALUE);
         return Mono.fromCallable(() -> connectToCluster(configuration))
             .doOnError(e -> LOGGER.warn("Error establishing ElasticSearch connection. Next retry scheduled in {}",
                 DurationFormatUtils.formatDurationWords(waitDelay.toMillis(), true, true), e))
-            .retryBackoff(configuration.getMaxRetries(), waitDelay, waitDelay)
+            .retryBackoff(configuration.getMaxRetries(), waitDelay, forever, Schedulers.elastic())
             .publishOn(Schedulers.elastic())
             .block();
     }
diff --git a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ResilientClusterProvider.java b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ResilientClusterProvider.java
index 9ed14c2..5259233 100644
--- a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ResilientClusterProvider.java
+++ b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ResilientClusterProvider.java
@@ -49,9 +49,10 @@ public class ResilientClusterProvider implements Provider<Cluster> {
     @Inject
     private ResilientClusterProvider(ClusterConfiguration configuration) {
         Duration waitDelay = Duration.ofMillis(configuration.getMinDelay());
+        Duration forever = Duration.ofMillis(Long.MAX_VALUE);
         cluster = Mono.fromCallable(getClusterRetryCallable(configuration))
             .doOnError(e -> LOGGER.warn("Error establishing Cassandra connection. Next retry scheduled in {} ms", waitDelay, e))
-            .retryBackoff(configuration.getMaxRetry(), waitDelay, waitDelay)
+            .retryBackoff(configuration.getMaxRetry(), waitDelay, forever, Schedulers.elastic())
             .publishOn(Schedulers.elastic())
             .block();
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org