You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ro...@apache.org on 2019/06/24 09:09:46 UTC
[james-project] 01/04: JAMES-2801 use elastic Scheduler by default
for retry and delay
This is an automated email from the ASF dual-hosted git repository.
rouazana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit fa54f12fea2d7390419baa813f47ed6bcfa62f0f
Author: Matthieu Baechler <ma...@apache.org>
AuthorDate: Thu Jun 20 17:23:24 2019 +0200
JAMES-2801 use elastic Scheduler by default for retry and delay
---
.../org/apache/james/backend/rabbitmq/RabbitMQConnectionFactory.java | 4 ++--
.../java/org/apache/james/backend/rabbitmq/SimpleChannelPool.java | 4 ++--
.../java/org/apache/james/backend/rabbitmq/SimpleConnectionPool.java | 4 ++--
.../apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java | 4 +++-
.../java/org/apache/james/mailbox/events/delivery/EventDelivery.java | 5 +++--
.../java/org/apache/james/mailbox/events/WaitDelayGenerator.java | 3 ++-
.../org/apache/james/modules/mailbox/ElasticSearchClientModule.java | 3 ++-
.../org/apache/james/modules/mailbox/ResilientClusterProvider.java | 3 ++-
8 files changed, 18 insertions(+), 12 deletions(-)
diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQConnectionFactory.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQConnectionFactory.java
index 4c67175..d733914 100644
--- a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQConnectionFactory.java
+++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQConnectionFactory.java
@@ -58,8 +58,8 @@ public class RabbitMQConnectionFactory {
}
Mono<Connection> connectionMono() {
+ Duration forever = Duration.ofMillis(Long.MAX_VALUE);
return Mono.fromCallable(connectionFactory::newConnection)
- .retryBackoff(configuration.getMaxRetries(), Duration.ofMillis(configuration.getMinDelayInMs()))
- .publishOn(Schedulers.elastic());
+ .retryBackoff(configuration.getMaxRetries(), Duration.ofMillis(configuration.getMinDelayInMs()), forever, Schedulers.elastic());
}
}
diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/SimpleChannelPool.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/SimpleChannelPool.java
index 2a7da85..16a8a7f 100644
--- a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/SimpleChannelPool.java
+++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/SimpleChannelPool.java
@@ -85,9 +85,9 @@ public class SimpleChannelPool implements RabbitMQChannelPool {
private Mono<Channel> getResilientChannel() {
int numRetries = 100;
Duration initialDelay = Duration.ofMillis(100);
+ Duration forever = Duration.ofMillis(Long.MAX_VALUE);
return Mono.defer(this::getOpenChannel)
- .publishOn(Schedulers.elastic())
- .retryBackoff(numRetries, initialDelay);
+ .retryBackoff(numRetries, initialDelay, forever, Schedulers.elastic());
}
private Mono<Channel> getOpenChannel() {
diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/SimpleConnectionPool.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/SimpleConnectionPool.java
index 519c1fa..5f07719 100644
--- a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/SimpleConnectionPool.java
+++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/SimpleConnectionPool.java
@@ -55,9 +55,9 @@ public class SimpleConnectionPool implements AutoCloseable {
public Mono<Connection> getResilientConnection() {
int numRetries = 100;
Duration initialDelay = Duration.ofMillis(100);
+ Duration forever = Duration.ofMillis(Long.MAX_VALUE);
return Mono.defer(this::getOpenConnection)
- .subscribeOn(Schedulers.elastic())
- .retryBackoff(numRetries, initialDelay);
+ .retryBackoff(numRetries, initialDelay, forever, Schedulers.elastic());
}
private Mono<Connection> getOpenConnection() {
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java
index c173392..02a7adc 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java
@@ -51,6 +51,7 @@ import com.datastax.driver.core.Session;
import com.google.common.base.MoreObjects;
import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
public class CassandraModSeqProvider implements ModSeqProvider {
@@ -186,9 +187,10 @@ public class CassandraModSeqProvider implements ModSeqProvider {
}
private Mono<ModSeq> handleRetries(CassandraId mailboxId) {
+ Duration forever = Duration.ofMillis(Long.MAX_VALUE);
return tryFindThenUpdateOnce(mailboxId)
.single()
- .retryBackoff(maxModSeqRetries, Duration.ofMillis(2));
+ .retryBackoff(maxModSeqRetries, Duration.ofMillis(2), forever, Schedulers.elastic());
}
private Mono<ModSeq> tryFindThenUpdateOnce(CassandraId mailboxId) {
diff --git a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDelivery.java b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDelivery.java
index e7fbcc1..7b2fb67 100644
--- a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDelivery.java
+++ b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDelivery.java
@@ -34,6 +34,7 @@ import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
public interface EventDelivery {
@@ -75,7 +76,7 @@ public interface EventDelivery {
}
private static final Logger LOGGER = LoggerFactory.getLogger(BackoffRetryer.class);
- private static final Duration MAX_BACKOFF = Duration.ofMillis(Long.MAX_VALUE);
+ private static final Duration FOREVER = Duration.ofMillis(Long.MAX_VALUE);
private final RetryBackoffConfiguration retryBackoff;
private final MailboxListener mailboxListener;
@@ -88,7 +89,7 @@ public interface EventDelivery {
@Override
public Mono<Void> doRetry(Mono<Void> executionResult, Event event) {
return executionResult
- .retryBackoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff(), MAX_BACKOFF, retryBackoff.getJitterFactor())
+ .retryBackoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff(), FOREVER, retryBackoff.getJitterFactor(), Schedulers.elastic())
.doOnError(throwable -> LOGGER.error("listener {} exceeded maximum retry({}) to handle event {}",
mailboxListener.getClass().getCanonicalName(),
retryBackoff.getMaxRetries(),
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/WaitDelayGenerator.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/WaitDelayGenerator.java
index 93a20df..ff0378b 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/WaitDelayGenerator.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/WaitDelayGenerator.java
@@ -26,6 +26,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
class WaitDelayGenerator {
@@ -56,7 +57,7 @@ class WaitDelayGenerator {
}
return countRetryMono
- .delayElement(generateDelay(retryCount));
+ .delayElement(generateDelay(retryCount), Schedulers.elastic());
}
@VisibleForTesting
diff --git a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchClientModule.java b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchClientModule.java
index 96350a1..e485fee 100644
--- a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchClientModule.java
+++ b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchClientModule.java
@@ -50,10 +50,11 @@ public class ElasticSearchClientModule extends AbstractModule {
@Singleton
protected RestHighLevelClient provideClient(ElasticSearchConfiguration configuration) {
Duration waitDelay = Duration.ofMillis(configuration.getMinDelay());
+ Duration forever = Duration.ofMillis(Long.MAX_VALUE);
return Mono.fromCallable(() -> connectToCluster(configuration))
.doOnError(e -> LOGGER.warn("Error establishing ElasticSearch connection. Next retry scheduled in {}",
DurationFormatUtils.formatDurationWords(waitDelay.toMillis(), true, true), e))
- .retryBackoff(configuration.getMaxRetries(), waitDelay, waitDelay)
+ .retryBackoff(configuration.getMaxRetries(), waitDelay, forever, Schedulers.elastic())
.publishOn(Schedulers.elastic())
.block();
}
diff --git a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ResilientClusterProvider.java b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ResilientClusterProvider.java
index 9ed14c2..5259233 100644
--- a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ResilientClusterProvider.java
+++ b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ResilientClusterProvider.java
@@ -49,9 +49,10 @@ public class ResilientClusterProvider implements Provider<Cluster> {
@Inject
private ResilientClusterProvider(ClusterConfiguration configuration) {
Duration waitDelay = Duration.ofMillis(configuration.getMinDelay());
+ Duration forever = Duration.ofMillis(Long.MAX_VALUE);
cluster = Mono.fromCallable(getClusterRetryCallable(configuration))
.doOnError(e -> LOGGER.warn("Error establishing Cassandra connection. Next retry scheduled in {} ms", waitDelay, e))
- .retryBackoff(configuration.getMaxRetry(), waitDelay, waitDelay)
+ .retryBackoff(configuration.getMaxRetry(), waitDelay, forever, Schedulers.elastic())
.publishOn(Schedulers.elastic())
.block();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org