You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2017/04/21 03:57:01 UTC
[02/17] james-project git commit: JAMES-1999 Elasticsearch does not
retry to connect when error
JAMES-1999 Elasticsearch does not retry to connect when error
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/d258d900
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/d258d900
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/d258d900
Branch: refs/heads/master
Commit: d258d900fea5c220480bcaad095942ecd59d2b12
Parents: 6c5912f
Author: quynhn <qn...@linagora.com>
Authored: Fri Apr 14 09:48:31 2017 +0700
Committer: benwa <bt...@linagora.com>
Committed: Fri Apr 21 07:52:19 2017 +0700
----------------------------------------------------------------------
.../modules/mailbox/CassandraSessionModule.java | 43 ++++++++++++--------
.../mailbox/ElasticSearchMailboxModule.java | 26 ++++++------
2 files changed, 40 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/james-project/blob/d258d900/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraSessionModule.java
----------------------------------------------------------------------
diff --git a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraSessionModule.java b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraSessionModule.java
index 8146c5a..8b92f99 100644
--- a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraSessionModule.java
+++ b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraSessionModule.java
@@ -29,6 +29,7 @@ import com.google.inject.Provides;
import com.google.inject.Singleton;
import com.google.inject.multibindings.Multibinder;
import com.nurkiewicz.asyncretry.AsyncRetryExecutor;
+import com.nurkiewicz.asyncretry.function.RetryCallable;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.james.backends.cassandra.components.CassandraModule;
@@ -40,6 +41,9 @@ import org.apache.james.backends.cassandra.init.QueryLoggerConfiguration;
import org.apache.james.backends.cassandra.init.SessionWithInitializedTablesFactory;
import org.apache.james.filesystem.api.FileSystem;
import org.apache.james.util.Host;
+import org.apache.james.utils.RetryExecutorUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.FileNotFoundException;
import java.util.Arrays;
@@ -51,6 +55,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class CassandraSessionModule extends AbstractModule {
+ private static final Logger LOGGER = LoggerFactory.getLogger(CassandraSessionModule.class);
private static final int DEFAULT_CONNECTION_MAX_RETRIES = 10;
private static final int DEFAULT_CONNECTION_MIN_DELAY = 5000;
@@ -91,19 +96,30 @@ public class CassandraSessionModule extends AbstractModule {
List<Host> servers = listCassandraServers(configuration);
QueryLoggerConfiguration queryLoggerConfiguration = getCassandraQueryLoggerConf(configuration);
- return getRetryer(executor, configuration)
- .getWithRetry(ctx -> ClusterWithKeyspaceCreatedFactory
- .config(
- ClusterBuilder.builder()
- .servers(servers)
- .queryLoggerConfiguration(queryLoggerConfiguration)
- .build(),
- configuration.getString("cassandra.keyspace"))
- .replicationFactor(configuration.getInt("cassandra.replication.factor"))
- .clusterWithInitializedKeyspace())
+ int maxRetries = configuration.getInt("cassandra.retryConnection.maxRetries", DEFAULT_CONNECTION_MAX_RETRIES);
+ int minDelay = configuration.getInt("cassandra.retryConnection.minDelay", DEFAULT_CONNECTION_MIN_DELAY);
+
+ return RetryExecutorUtil.retryOnExceptions(executor, maxRetries, minDelay, NoHostAvailableException.class)
+ .getWithRetry(getClusterRetryCallable(configuration, servers, queryLoggerConfiguration))
.get();
}
+ private RetryCallable<Cluster> getClusterRetryCallable(PropertiesConfiguration configuration, List<Host> servers, QueryLoggerConfiguration queryLoggerConfiguration) {
+ LOGGER.info("Trying to connect to Cassandra service");
+
+ return context -> ClusterWithKeyspaceCreatedFactory
+ .config(getCluster(servers, queryLoggerConfiguration), configuration.getString("cassandra.keyspace"))
+ .replicationFactor(configuration.getInt("cassandra.replication.factor"))
+ .clusterWithInitializedKeyspace();
+ }
+
+ private Cluster getCluster(List<Host> servers, QueryLoggerConfiguration queryLoggerConfiguration) {
+ return ClusterBuilder.builder()
+ .servers(servers)
+ .queryLoggerConfiguration(queryLoggerConfiguration)
+ .build();
+ }
+
private List<Host> listCassandraServers(PropertiesConfiguration configuration) {
String[] ipAndPorts = configuration.getStringArray("cassandra.nodes");
@@ -154,13 +170,6 @@ public class CassandraSessionModule extends AbstractModule {
return builder.build();
}
- private static AsyncRetryExecutor getRetryer(AsyncRetryExecutor executor, PropertiesConfiguration configuration) {
- return executor.retryOn(NoHostAvailableException.class)
- .withProportionalJitter()
- .withMaxRetries(configuration.getInt("cassandra.retryConnection.maxRetries", DEFAULT_CONNECTION_MAX_RETRIES))
- .withMinDelay(configuration.getInt("cassandra.retryConnection.minDelay", DEFAULT_CONNECTION_MIN_DELAY));
- }
-
@Provides
private AsyncRetryExecutor provideAsyncRetryExecutor(ScheduledExecutorService scheduler) {
return new AsyncRetryExecutor(scheduler);
http://git-wip-us.apache.org/repos/asf/james-project/blob/d258d900/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchMailboxModule.java
----------------------------------------------------------------------
diff --git a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchMailboxModule.java b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchMailboxModule.java
index 4c3c9ef..48c4145 100644
--- a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchMailboxModule.java
+++ b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchMailboxModule.java
@@ -42,8 +42,11 @@ import org.apache.james.mailbox.extractor.TextExtractor;
import org.apache.james.mailbox.store.search.ListeningMessageSearchIndex;
import org.apache.james.mailbox.store.search.MessageSearchIndex;
import org.apache.james.mailbox.tika.extractor.TikaTextExtractor;
+import org.apache.james.utils.RetryExecutorUtil;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.NoNodeAvailableException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.inject.AbstractModule;
@@ -52,6 +55,7 @@ import com.google.inject.Scopes;
import com.nurkiewicz.asyncretry.AsyncRetryExecutor;
public class ElasticSearchMailboxModule extends AbstractModule {
+ private static final Logger LOGGER = LoggerFactory.getLogger(ElasticSearchMailboxModule.class);
public static final String ES_CONFIG_FILE = FileSystem.FILE_PROTOCOL_AND_CONF + "elasticsearch.properties";
public static final String ELASTICSEARCH_HOSTS = "elasticsearch.hosts";
@@ -77,11 +81,17 @@ public class ElasticSearchMailboxModule extends AbstractModule {
@Singleton
protected Client provideClientProvider(FileSystem fileSystem, AsyncRetryExecutor executor) throws ConfigurationException, FileNotFoundException, ExecutionException, InterruptedException {
PropertiesConfiguration propertiesReader = new PropertiesConfiguration(fileSystem.getFile(ES_CONFIG_FILE));
+ int maxRetries = propertiesReader.getInt("elasticsearch.retryConnection.maxRetries", DEFAULT_CONNECTION_MAX_RETRIES);
+ int minDelay = propertiesReader.getInt("elasticsearch.retryConnection.minDelay", DEFAULT_CONNECTION_MIN_DELAY);
- ClientProvider clientProvider = connectToCluster(propertiesReader);
+ return RetryExecutorUtil.retryOnExceptions(executor, maxRetries, minDelay, NoNodeAvailableException.class)
+ .getWithRetry(context -> connectToCluster(propertiesReader))
+ .get();
+ }
- Client client = getRetryer(executor, propertiesReader)
- .getWithRetry(ctx -> clientProvider.get()).get();
+ private Client createClientAndIndex(ClientProvider clientProvider, PropertiesConfiguration propertiesReader) {
+ LOGGER.info("Trying to connect to ElasticSearch service");
+ Client client = clientProvider.get();
IndexCreationFactory.createIndex(client,
MailboxElasticsearchConstants.MAILBOX_INDEX,
propertiesReader.getInt("elasticsearch.nb.shards"),
@@ -122,15 +132,7 @@ public class ElasticSearchMailboxModule extends AbstractModule {
}
}
- private static AsyncRetryExecutor getRetryer(AsyncRetryExecutor executor, PropertiesConfiguration configuration) {
- return executor
- .withProportionalJitter()
- .retryOn(NoNodeAvailableException.class)
- .withMaxRetries(configuration.getInt("elasticsearch.retryConnection.maxRetries", DEFAULT_CONNECTION_MAX_RETRIES))
- .withMinDelay(configuration.getInt("elasticsearch.retryConnection.minDelay", DEFAULT_CONNECTION_MIN_DELAY));
- }
-
- @Provides
+ @Provides
@Singleton
public IndexAttachments provideIndexAttachments(PropertiesConfiguration configuration) {
if (configuration.getBoolean("elasticsearch.indexAttachments", DEFAULT_INDEX_ATTACHMENTS)) {
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org