You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2017/04/21 03:57:01 UTC

[02/17] james-project git commit: JAMES-1999 Elasticsearch does not retry to connect when error

JAMES-1999 Elasticsearch does not retry to connect when error


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/d258d900
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/d258d900
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/d258d900

Branch: refs/heads/master
Commit: d258d900fea5c220480bcaad095942ecd59d2b12
Parents: 6c5912f
Author: quynhn <qn...@linagora.com>
Authored: Fri Apr 14 09:48:31 2017 +0700
Committer: benwa <bt...@linagora.com>
Committed: Fri Apr 21 07:52:19 2017 +0700

----------------------------------------------------------------------
 .../modules/mailbox/CassandraSessionModule.java | 43 ++++++++++++--------
 .../mailbox/ElasticSearchMailboxModule.java     | 26 ++++++------
 2 files changed, 40 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/d258d900/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraSessionModule.java
----------------------------------------------------------------------
diff --git a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraSessionModule.java b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraSessionModule.java
index 8146c5a..8b92f99 100644
--- a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraSessionModule.java
+++ b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraSessionModule.java
@@ -29,6 +29,7 @@ import com.google.inject.Provides;
 import com.google.inject.Singleton;
 import com.google.inject.multibindings.Multibinder;
 import com.nurkiewicz.asyncretry.AsyncRetryExecutor;
+import com.nurkiewicz.asyncretry.function.RetryCallable;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.james.backends.cassandra.components.CassandraModule;
@@ -40,6 +41,9 @@ import org.apache.james.backends.cassandra.init.QueryLoggerConfiguration;
 import org.apache.james.backends.cassandra.init.SessionWithInitializedTablesFactory;
 import org.apache.james.filesystem.api.FileSystem;
 import org.apache.james.util.Host;
+import org.apache.james.utils.RetryExecutorUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.FileNotFoundException;
 import java.util.Arrays;
@@ -51,6 +55,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 public class CassandraSessionModule extends AbstractModule {
+    private static final Logger LOGGER = LoggerFactory.getLogger(CassandraSessionModule.class);
 
     private static final int DEFAULT_CONNECTION_MAX_RETRIES = 10;
     private static final int DEFAULT_CONNECTION_MIN_DELAY = 5000;
@@ -91,19 +96,30 @@ public class CassandraSessionModule extends AbstractModule {
         List<Host> servers = listCassandraServers(configuration);
         QueryLoggerConfiguration queryLoggerConfiguration = getCassandraQueryLoggerConf(configuration);
 
-        return getRetryer(executor, configuration)
-                .getWithRetry(ctx -> ClusterWithKeyspaceCreatedFactory
-                        .config(
-                            ClusterBuilder.builder()
-                                .servers(servers)
-                                .queryLoggerConfiguration(queryLoggerConfiguration)
-                                .build(),
-                            configuration.getString("cassandra.keyspace"))
-                        .replicationFactor(configuration.getInt("cassandra.replication.factor"))
-                        .clusterWithInitializedKeyspace())
+        int maxRetries = configuration.getInt("cassandra.retryConnection.maxRetries", DEFAULT_CONNECTION_MAX_RETRIES);
+        int minDelay = configuration.getInt("cassandra.retryConnection.minDelay", DEFAULT_CONNECTION_MIN_DELAY);
+
+        return RetryExecutorUtil.retryOnExceptions(executor, maxRetries, minDelay, NoHostAvailableException.class)
+                .getWithRetry(getClusterRetryCallable(configuration, servers, queryLoggerConfiguration))
                 .get();
     }
 
+    private RetryCallable<Cluster> getClusterRetryCallable(PropertiesConfiguration configuration, List<Host> servers, QueryLoggerConfiguration queryLoggerConfiguration) {
+        LOGGER.info("Trying to connect to Cassandra service");
+
+        return context -> ClusterWithKeyspaceCreatedFactory
+            .config(getCluster(servers, queryLoggerConfiguration), configuration.getString("cassandra.keyspace"))
+            .replicationFactor(configuration.getInt("cassandra.replication.factor"))
+            .clusterWithInitializedKeyspace();
+    }
+
+    private Cluster getCluster(List<Host> servers, QueryLoggerConfiguration queryLoggerConfiguration) {
+        return ClusterBuilder.builder()
+            .servers(servers)
+            .queryLoggerConfiguration(queryLoggerConfiguration)
+            .build();
+    }
+
     private List<Host> listCassandraServers(PropertiesConfiguration configuration) {
         String[] ipAndPorts = configuration.getStringArray("cassandra.nodes");
 
@@ -154,13 +170,6 @@ public class CassandraSessionModule extends AbstractModule {
         return builder.build();
     }
 
-    private static AsyncRetryExecutor getRetryer(AsyncRetryExecutor executor, PropertiesConfiguration configuration) {
-        return executor.retryOn(NoHostAvailableException.class)
-                .withProportionalJitter()
-                .withMaxRetries(configuration.getInt("cassandra.retryConnection.maxRetries", DEFAULT_CONNECTION_MAX_RETRIES))
-                .withMinDelay(configuration.getInt("cassandra.retryConnection.minDelay", DEFAULT_CONNECTION_MIN_DELAY));
-    }
-
     @Provides
     private AsyncRetryExecutor provideAsyncRetryExecutor(ScheduledExecutorService scheduler) {
         return new AsyncRetryExecutor(scheduler);

http://git-wip-us.apache.org/repos/asf/james-project/blob/d258d900/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchMailboxModule.java
----------------------------------------------------------------------
diff --git a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchMailboxModule.java b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchMailboxModule.java
index 4c3c9ef..48c4145 100644
--- a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchMailboxModule.java
+++ b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchMailboxModule.java
@@ -42,8 +42,11 @@ import org.apache.james.mailbox.extractor.TextExtractor;
 import org.apache.james.mailbox.store.search.ListeningMessageSearchIndex;
 import org.apache.james.mailbox.store.search.MessageSearchIndex;
 import org.apache.james.mailbox.tika.extractor.TikaTextExtractor;
+import org.apache.james.utils.RetryExecutorUtil;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.client.transport.NoNodeAvailableException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.inject.AbstractModule;
@@ -52,6 +55,7 @@ import com.google.inject.Scopes;
 import com.nurkiewicz.asyncretry.AsyncRetryExecutor;
 
 public class ElasticSearchMailboxModule extends AbstractModule {
+    private static final Logger LOGGER = LoggerFactory.getLogger(ElasticSearchMailboxModule.class);
 
     public static final String ES_CONFIG_FILE = FileSystem.FILE_PROTOCOL_AND_CONF + "elasticsearch.properties";
     public static final String ELASTICSEARCH_HOSTS = "elasticsearch.hosts";
@@ -77,11 +81,17 @@ public class ElasticSearchMailboxModule extends AbstractModule {
     @Singleton
     protected Client provideClientProvider(FileSystem fileSystem, AsyncRetryExecutor executor) throws ConfigurationException, FileNotFoundException, ExecutionException, InterruptedException {
         PropertiesConfiguration propertiesReader = new PropertiesConfiguration(fileSystem.getFile(ES_CONFIG_FILE));
+        int maxRetries = propertiesReader.getInt("elasticsearch.retryConnection.maxRetries", DEFAULT_CONNECTION_MAX_RETRIES);
+        int minDelay = propertiesReader.getInt("elasticsearch.retryConnection.minDelay", DEFAULT_CONNECTION_MIN_DELAY);
 
-        ClientProvider clientProvider = connectToCluster(propertiesReader);
+        return RetryExecutorUtil.retryOnExceptions(executor, maxRetries, minDelay, NoNodeAvailableException.class)
+            .getWithRetry(context -> connectToCluster(propertiesReader))
+            .get();
+    }
 
-        Client client = getRetryer(executor, propertiesReader)
-                .getWithRetry(ctx -> clientProvider.get()).get();
+    private Client createClientAndIndex(ClientProvider clientProvider, PropertiesConfiguration propertiesReader) {
+        LOGGER.info("Trying to connect to ElasticSearch service");
+        Client client = clientProvider.get();
         IndexCreationFactory.createIndex(client,
             MailboxElasticsearchConstants.MAILBOX_INDEX,
             propertiesReader.getInt("elasticsearch.nb.shards"),
@@ -122,15 +132,7 @@ public class ElasticSearchMailboxModule extends AbstractModule {
         }
     }
 
-    private static AsyncRetryExecutor getRetryer(AsyncRetryExecutor executor, PropertiesConfiguration configuration) {
-        return executor
-                .withProportionalJitter()
-                .retryOn(NoNodeAvailableException.class)
-                .withMaxRetries(configuration.getInt("elasticsearch.retryConnection.maxRetries", DEFAULT_CONNECTION_MAX_RETRIES))
-                .withMinDelay(configuration.getInt("elasticsearch.retryConnection.minDelay", DEFAULT_CONNECTION_MIN_DELAY));
-    }
-
-    @Provides 
+    @Provides
     @Singleton
     public IndexAttachments provideIndexAttachments(PropertiesConfiguration configuration) {
         if (configuration.getBoolean("elasticsearch.indexAttachments", DEFAULT_INDEX_ATTACHMENTS)) {


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org