You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2014/10/30 20:58:53 UTC
git commit: Added automatic restart to ES client. We seem to get
disconnected with large clusters occasionally.
Repository: incubator-usergrid
Updated Branches:
refs/heads/key-row-sharding 599d9e73c -> 4b972ead9
Added automatic restart to ES client. We seem to get disconnected with large clusters occasionally.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/4b972ead
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/4b972ead
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/4b972ead
Branch: refs/heads/key-row-sharding
Commit: 4b972ead90ebfcb7d8c9bb964aa332f54a12e0fd
Parents: 599d9e7
Author: Todd Nine <tn...@apigee.com>
Authored: Thu Oct 30 13:58:50 2014 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Thu Oct 30 13:58:50 2014 -0600
----------------------------------------------------------------------
.../corepersistence/CpManagerCache.java | 12 +-
.../usergrid/persistence/index/IndexFig.java | 10 +
.../index/impl/EsEntityIndexBatchImpl.java | 23 ++-
.../index/impl/EsEntityIndexImpl.java | 105 ++++++----
.../persistence/index/impl/EsProvider.java | 196 ++++++++-----------
.../persistence/index/impl/FailureMonitor.java | 41 ++++
.../index/impl/FailureMonitorImpl.java | 101 ++++++++++
.../index/impl/ElasticSearchRule.java | 2 +-
8 files changed, 321 insertions(+), 169 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4b972ead/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpManagerCache.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpManagerCache.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpManagerCache.java
index 99bde22..a73dc6e 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpManagerCache.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpManagerCache.java
@@ -44,8 +44,7 @@ public class CpManagerCache {
private LoadingCache<CollectionScope, EntityCollectionManager> ecmCache =
CacheBuilder.newBuilder()
- .maximumSize(100)
- .expireAfterWrite( 1, TimeUnit.HOURS)
+ .maximumSize(1000)
.build( new CacheLoader<CollectionScope, EntityCollectionManager>() {
public EntityCollectionManager load( CollectionScope scope ) {
return ecmf.createCollectionManager( scope );
@@ -55,8 +54,7 @@ public class CpManagerCache {
private LoadingCache<ApplicationScope, EntityIndex> eiCache =
CacheBuilder.newBuilder()
- .maximumSize(100)
- .expireAfterWrite( 1, TimeUnit.HOURS )
+ .maximumSize(1000)
.build( new CacheLoader<ApplicationScope, EntityIndex>() {
public EntityIndex load( ApplicationScope scope ) {
return eif.createEntityIndex( scope );
@@ -66,8 +64,7 @@ public class CpManagerCache {
private LoadingCache<ApplicationScope, GraphManager> gmCache =
CacheBuilder.newBuilder()
- .maximumSize(100)
- .expireAfterWrite( 1, TimeUnit.HOURS )
+ .maximumSize(1000)
.build( new CacheLoader<ApplicationScope, GraphManager>() {
public GraphManager load( ApplicationScope scope ) {
return gmf.createEdgeManager( scope );
@@ -77,8 +74,7 @@ public class CpManagerCache {
private LoadingCache<MapScope, MapManager> mmCache =
CacheBuilder.newBuilder()
- .maximumSize(100)
- .expireAfterWrite( 1, TimeUnit.HOURS )
+ .maximumSize(1000)
.build( new CacheLoader<MapScope, MapManager>() {
public MapManager load( MapScope scope ) {
return mmf.createMapManager( scope );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4b972ead/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
index 142c48a..e10ce44 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
@@ -43,6 +43,12 @@ public interface IndexFig extends GuicyFig {
public static final String ELASTICSEARCH_FORCE_REFRESH = "elasticsearch.force_refresh";
+
+ /**
+ * the number of times we can fail before we refresh the client
+ */
+ public static final String ELASTICSEARCH_FAIL_REFRESH = "elasticsearch.fail_refresh";
+
public static final String QUERY_LIMIT_DEFAULT = "index.query.limit.default";
@Default( "127.0.0.1" )
@@ -82,4 +88,8 @@ public interface IndexFig extends GuicyFig {
@Default("default")
@Key( ELASTICSEARCH_NODENAME )
public String getNodeName();
+
+ @Default( "20" )
+ @Key( ELASTICSEARCH_FAIL_REFRESH )
+ int getFailRefreshCount();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4b972ead/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
index fab135a..ad58fa8 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
@@ -88,15 +88,15 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
private int count;
+ private final FailureMonitor failureMonitor;
- public EsEntityIndexBatchImpl(
- final ApplicationScope applicationScope,
- final Client client,
- final IndexFig config,
- final int autoFlushSize ) {
+
+ public EsEntityIndexBatchImpl( final ApplicationScope applicationScope, final Client client, final IndexFig config,
+ final int autoFlushSize, final FailureMonitor failureMonitor ) {
this.applicationScope = applicationScope;
this.client = client;
+ this.failureMonitor = failureMonitor;
this.indexName = createIndexName( config.getIndexPrefix(), applicationScope );
this.refresh = config.isForcedRefresh();
this.autoFlushSize = autoFlushSize;
@@ -208,7 +208,18 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
return;
}
- final BulkResponse responses = request.execute().actionGet();
+ final BulkResponse responses;
+
+ try {
+ responses = request.execute().actionGet();
+ }catch(Throwable t){
+ log.error( "Unable to communicate with elasticsearch" );
+ failureMonitor.fail( "Unable to execute batch", t );
+ throw t;
+ }
+
+
+ failureMonitor.success();
for ( BulkItemResponse response : responses ) {
if ( response.isFailed() ) {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4b972ead/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index fc09b5a..f8e73c2 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@ -81,16 +81,22 @@ public class EsEntityIndexImpl implements EntityIndex {
private static final AtomicBoolean mappingsCreated = new AtomicBoolean( false );
+ /**
+ * We purposefully make this per instance. Some indexes may work, while others may fail
+ */
+ private FailureMonitor failureMonitor;
+
private final String indexName;
private final ApplicationScope applicationScope;
- private final Client client;
+ private final EsProvider esProvider;
private final int cursorTimeout;
private final IndexFig config;
+
//number of times to wait for the index to refresh properly.
private static final int MAX_WAITS = 10;
//number of milliseconds to try again before sleeping
@@ -111,10 +117,11 @@ public class EsEntityIndexImpl implements EntityIndex {
ValidationUtils.validateApplicationScope( appScope );
this.applicationScope = appScope;
- this.client = provider.getClient();
+ this.esProvider = provider;
this.config = config;
this.cursorTimeout = config.getQueryCursorTimeout();
this.indexName = IndexingUtils.createIndexName( config.getIndexPrefix(), appScope );
+ this.failureMonitor = new FailureMonitorImpl( config, provider );
}
@@ -126,7 +133,7 @@ public class EsEntityIndexImpl implements EntityIndex {
createMappings();
}
- AdminClient admin = client.admin();
+ AdminClient admin = esProvider.getClient().admin();
CreateIndexResponse cir = admin.indices().prepareCreate( indexName ).execute().actionGet();
log.info( "Created new Index Name [{}] ACK=[{}]", indexName, cir.isAcknowledged() );
@@ -134,7 +141,8 @@ public class EsEntityIndexImpl implements EntityIndex {
// Immediately create a document and remove it to ensure the entire cluster is ready
// to receive documents. Occasionally we see errors. See this post:
- // http://elasticsearch-users.115913.n3.nabble.com/IndexMissingException-on-create-index-followed-by-refresh-td1832793.html
+ // http://elasticsearch-users.115913.n3.nabble
+ // .com/IndexMissingException-on-create-index-followed-by-refresh-td1832793.html
testNewIndex();
}
@@ -164,18 +172,16 @@ public class EsEntityIndexImpl implements EntityIndex {
public boolean doOp() {
final String tempId = UUIDGenerator.newTimeUUID().toString();
- client.prepareIndex( indexName, VERIFY_TYPE, tempId )
- .setSource( DEFAULT_PAYLOAD ).get();
+ esProvider.getClient().prepareIndex( indexName, VERIFY_TYPE, tempId ).setSource( DEFAULT_PAYLOAD ).get();
- log.info( "Successfully created new document with docId {} in index {} and type {}",
- tempId, indexName, VERIFY_TYPE );
+ log.info( "Successfully created new document with docId {} in index {} and type {}", tempId, indexName,
+ VERIFY_TYPE );
// delete all types, this way if we miss one it will get cleaned up
- client.prepareDeleteByQuery( indexName ).setTypes( VERIFY_TYPE )
- .setQuery( MATCH_ALL_QUERY_BUILDER ).get();
+ esProvider.getClient().prepareDeleteByQuery( indexName ).setTypes( VERIFY_TYPE ).setQuery( MATCH_ALL_QUERY_BUILDER )
+ .get();
- log.info( "Successfully deleted all documents in index {} and type {}",
- indexName, VERIFY_TYPE );
+ log.info( "Successfully deleted all documents in index {} and type {}", indexName, VERIFY_TYPE );
return true;
}
@@ -186,25 +192,24 @@ public class EsEntityIndexImpl implements EntityIndex {
/**
- * Setup ElasticSearch type mappings as a template that applies to all new indexes.
- * Applies to all indexes that start with our prefix.
+ * Setup ElasticSearch type mappings as a template that applies to all new indexes. Applies to all indexes that
+ * start with our prefix.
*/
private void createMappings() throws IOException {
- XContentBuilder xcb = IndexingUtils
- .createDoubleStringIndexMapping( XContentFactory.jsonBuilder(), "_default_" );
+ XContentBuilder xcb =
+ IndexingUtils.createDoubleStringIndexMapping( XContentFactory.jsonBuilder(), "_default_" );
- PutIndexTemplateResponse pitr = client.admin().indices()
- .preparePutTemplate( "usergrid_template" )
- .setTemplate( config.getIndexPrefix() + "*" )
- .addMapping( "_default_", xcb ) // set mapping as the default for all types
- .execute().actionGet();
+ PutIndexTemplateResponse pitr = esProvider.getClient().admin().indices().preparePutTemplate( "usergrid_template" )
+ .setTemplate( config.getIndexPrefix() + "*" ).addMapping( "_default_",
+ xcb ) // set mapping as the default for all types
+ .execute().actionGet();
}
@Override
public EntityIndexBatch createBatch() {
- return new EsEntityIndexBatchImpl( applicationScope, client, config, 1000 );
+ return new EsEntityIndexBatchImpl( applicationScope, esProvider.getClient(), config, 1000, failureMonitor );
}
@@ -224,8 +229,9 @@ public class EsEntityIndexImpl implements EntityIndex {
SearchResponse searchResponse;
if ( query.getCursor() == null ) {
- SearchRequestBuilder srb = client.prepareSearch( indexName )
- .setTypes( indexType ).setScroll( cursorTimeout + "m" ) .setQuery( qb );
+ SearchRequestBuilder srb =
+ esProvider.getClient().prepareSearch( indexName ).setTypes( indexType ).setScroll( cursorTimeout + "m" )
+ .setQuery( qb );
FilterBuilder fb = query.createFilterBuilder();
if ( fb != null ) {
@@ -251,25 +257,35 @@ public class EsEntityIndexImpl implements EntityIndex {
// to ignore any fields that are not present.
final String stringFieldName = STRING_PREFIX + sp.getPropertyName();
- final FieldSortBuilder stringSort = SortBuilders.fieldSort( stringFieldName )
- .order( order ).ignoreUnmapped( true );
+ final FieldSortBuilder stringSort =
+ SortBuilders.fieldSort( stringFieldName ).order( order ).ignoreUnmapped( true );
srb.addSort( stringSort );
log.debug( " Sort: {} order by {}", stringFieldName, order.toString() );
final String numberFieldName = NUMBER_PREFIX + sp.getPropertyName();
- final FieldSortBuilder numberSort = SortBuilders.fieldSort( numberFieldName )
- .order( order ).ignoreUnmapped( true );
+ final FieldSortBuilder numberSort =
+ SortBuilders.fieldSort( numberFieldName ).order( order ).ignoreUnmapped( true );
srb.addSort( numberSort );
log.debug( " Sort: {} order by {}", numberFieldName, order.toString() );
final String booleanFieldName = BOOLEAN_PREFIX + sp.getPropertyName();
- final FieldSortBuilder booleanSort = SortBuilders.fieldSort( booleanFieldName )
- .order( order ).ignoreUnmapped( true );
+ final FieldSortBuilder booleanSort =
+ SortBuilders.fieldSort( booleanFieldName ).order( order ).ignoreUnmapped( true );
srb.addSort( booleanSort );
log.debug( " Sort: {} order by {}", booleanFieldName, order.toString() );
}
- searchResponse = srb.execute().actionGet();
+ try {
+ searchResponse = srb.execute().actionGet();
+ }
+ catch ( Throwable t ) {
+ log.error( "Unable to communicate with elasticsearch" );
+ failureMonitor.fail( "Unable to execute batch", t );
+ throw t;
+ }
+
+
+ failureMonitor.success();
}
else {
String scrollId = query.getCursor();
@@ -281,9 +297,19 @@ public class EsEntityIndexImpl implements EntityIndex {
}
log.debug( "Executing query with cursor: {} ", scrollId );
- SearchScrollRequestBuilder ssrb = client.prepareSearchScroll( scrollId )
- .setScroll( cursorTimeout + "m" );
- searchResponse = ssrb.execute().actionGet();
+ SearchScrollRequestBuilder ssrb = esProvider.getClient().prepareSearchScroll( scrollId ).setScroll( cursorTimeout + "m" );
+
+ try {
+ searchResponse = ssrb.execute().actionGet();
+ }
+ catch ( Throwable t ) {
+ log.error( "Unable to communicate with elasticsearch" );
+ failureMonitor.fail( "Unable to execute batch", t );
+ throw t;
+ }
+
+
+ failureMonitor.success();
}
SearchHits hits = searchResponse.getHits();
@@ -323,12 +349,12 @@ public class EsEntityIndexImpl implements EntityIndex {
@Override
public boolean doOp() {
try {
- client.admin().indices().prepareRefresh( indexName ).execute().actionGet();
+ esProvider.getClient().admin().indices().prepareRefresh( indexName ).execute().actionGet();
log.debug( "Refreshed index: " + indexName );
return true;
}
catch ( IndexMissingException e ) {
- log.error( "Unable to refresh index after create. Waiting before sleeping.", e);
+ log.error( "Unable to refresh index after create. Waiting before sleeping.", e );
throw e;
}
}
@@ -353,7 +379,7 @@ public class EsEntityIndexImpl implements EntityIndex {
* For testing only.
*/
public void deleteIndex() {
- AdminClient adminClient = client.admin();
+ AdminClient adminClient = esProvider.getClient().admin();
DeleteIndexResponse response = adminClient.indices().prepareDelete( indexName ).get();
if ( response.isAcknowledged() ) {
log.info( "Deleted index: " + indexName );
@@ -366,13 +392,12 @@ public class EsEntityIndexImpl implements EntityIndex {
/**
* Do the retry operation
- * @param operation
*/
private void doInRetry( final RetryOperation operation ) {
for ( int i = 0; i < MAX_WAITS; i++ ) {
try {
- if(operation.doOp()){
+ if ( operation.doOp() ) {
return;
}
}
@@ -401,4 +426,6 @@ public class EsEntityIndexImpl implements EntityIndex {
*/
public boolean doOp();
}
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4b972ead/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsProvider.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsProvider.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsProvider.java
index efc61a9..eb99e3e 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsProvider.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsProvider.java
@@ -18,17 +18,13 @@
*/
package org.apache.usergrid.persistence.index.impl;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
+
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Properties;
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang.RandomStringUtils;
-import org.apache.usergrid.persistence.core.util.AvailablePortFinder;
-import org.apache.usergrid.persistence.index.IndexFig;
+
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
@@ -37,6 +33,14 @@ import org.elasticsearch.node.NodeBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.RandomStringUtils;
+
+import org.apache.usergrid.persistence.index.IndexFig;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
/**
* Provides access to ElasticSearch client and, optionally, embedded ElasticSearch for testing.
@@ -44,154 +48,116 @@ import org.slf4j.LoggerFactory;
@Singleton
public class EsProvider {
- private static final Logger log = LoggerFactory.getLogger(EsProvider.class);
+ private static final Logger log = LoggerFactory.getLogger( EsProvider.class );
private final IndexFig indexFig;
private static Client client;
+ private static Node node;
public static String LOCAL_ES_PORT_PROPNAME = "EMBEDDED_ES_PORT";
@Inject
- public EsProvider(IndexFig fig) {
+ public EsProvider( IndexFig fig ) {
this.indexFig = fig;
}
- public synchronized Client getClient() {
- if (client == null) {
- client = getClient(indexFig);
+
+ /**
+ * Get the client instnace
+ */
+ public Client getClient() {
+ if ( client == null ) {
+ //synchronize on creating the client so we don't create too many
+ createClient( indexFig );
}
return client;
}
-
+
+ /**
+ * Reset the client instnace
+ */
public void releaseClient() {
- client = null;
+ //reset our static variables
+ if ( client != null && node != null ) {
+ node.stop();
+ node = null;
+ client = null;
+ }
}
- public static synchronized Client getClient(IndexFig fig) {
-
- if (client == null) {
-
- Client newClient = null;
-
- if ("embedded".equals( fig.getStartUp()) ) {
+ private synchronized void createClient( IndexFig fig ) {
- int port = AvailablePortFinder.getNextAvailable(2000);
- System.setProperty( LOCAL_ES_PORT_PROPNAME, port+"" );
-
- File tempDir;
- try {
- tempDir = getTempDirectory();
- } catch (Exception ex) {
- throw new RuntimeException(
- "Fatal error unable to create temp dir, start embedded ElasticSearch", ex);
- }
-
- Settings settings = ImmutableSettings.settingsBuilder()
-
- .put("cluster.name", fig.getClusterName())
-
- .put("network.publish_host","127.0.0.1")
- .put("transport.tcp.port", port)
- .put("discovery.zen.ping.multicast.enabled","false")
- .put("node.http.enabled", false)
-
- .put("path.logs", tempDir.toString())
- .put("path.data", tempDir.toString())
-
- .put("gateway.type", "none")
- .put("index.store.type", "memory")
- .put("index.number_of_shards", 1)
- .put("index.number_of_replicas", 1)
- .build();
-
- log.info("-----------------------------------------------------------------------");
- log.info("Starting ElasticSearch embedded server settings: \n"+settings.getAsMap());
- log.info("-----------------------------------------------------------------------");
-
- Node node = NodeBuilder.nodeBuilder().settings(settings)
- .clusterName( fig.getClusterName() ).node();
-
- newClient = node.client();
-
-
- } else {
-
- String allHosts = "";
-
- if ("remote".equals( fig.getStartUp()) ) {
-
- // we will connect to ES on all configured hosts
- String SEP = "";
- for (String host : fig.getHosts().split(",")) {
- allHosts = allHosts + SEP + host + ":" + fig.getPort();
- SEP = ",";
- }
+ if ( client != null && node != null) {
+ return;
+ }
- } else {
- // we will connect to forked ES on localhost
- allHosts = "localhost:" + System.getProperty(LOCAL_ES_PORT_PROPNAME);
- }
+ String allHosts = "";
- String nodeName = fig.getNodeName();
- if ( "default".equals( nodeName )) {
- // no nodeName was specified, use hostname
- try {
- nodeName = InetAddress.getLocalHost().getHostName();
+ if ( "remote".equals( fig.getStartUp() ) ) {
- } catch (UnknownHostException ex) {
- nodeName = "client-" + RandomStringUtils.randomAlphabetic(8);
- log.warn("Couldn't get hostname to use as ES node name, using " + nodeName);
- }
- }
+ // we will connect to ES on all configured hosts
+ String SEP = "";
+ for ( String host : fig.getHosts().split( "," ) ) {
+ allHosts = allHosts + SEP + host + ":" + fig.getPort();
+ SEP = ",";
+ }
+ }
+ else {
- Settings settings = ImmutableSettings.settingsBuilder()
+ // we will connect to forked ES on localhost
+ allHosts = "localhost:" + System.getProperty( LOCAL_ES_PORT_PROPNAME );
+ }
- .put( "cluster.name", fig.getClusterName() )
+ String nodeName = fig.getNodeName();
+ if ( "default".equals( nodeName ) ) {
+ // no nodeName was specified, use hostname
+ try {
+ nodeName = InetAddress.getLocalHost().getHostName();
+ }
+ catch ( UnknownHostException ex ) {
+ nodeName = "client-" + RandomStringUtils.randomAlphabetic( 8 );
+ log.warn( "Couldn't get hostname to use as ES node name, using " + nodeName );
+ }
+ }
- // this assumes that we're using zen for host discovery. Putting an
- // explicit set of bootstrap hosts ensures we connect to a valid cluster.
- .put( "discovery.zen.ping.unicast.hosts", allHosts )
- .put( "discovery.zen.ping.multicast.enabled", "false" )
- .put("http.enabled", false)
+ Settings settings = ImmutableSettings.settingsBuilder()
- .put( "client.transport.ping_timeout", 2000 ) // milliseconds
- .put( "client.transport.nodes_sampler_interval", 100 )
- .put( "network.tcp.blocking", true )
- .put( "node.client", true )
- .put( "node.name", nodeName )
+ .put( "cluster.name", fig.getClusterName() )
- .build();
+ // this assumes that we're using zen for host discovery. Putting an
+ // explicit set of bootstrap hosts ensures we connect to a valid cluster.
+ .put( "discovery.zen.ping.unicast.hosts", allHosts )
+ .put( "discovery.zen.ping.multicast.enabled", "false" ).put( "http.enabled", false )
- log.debug("Creating ElasticSearch client with settings: " + settings.getAsMap());
+ .put( "client.transport.ping_timeout", 2000 ) // milliseconds
+ .put( "client.transport.nodes_sampler_interval", 100 ).put( "network.tcp.blocking", true )
+ .put( "node.client", true ).put( "node.name", nodeName )
- // use this client when connecting via socket only,
- // such as ssh tunnel or other firewall issues
- // newClient = new TransportClient(settings).addTransportAddress(
- // new InetSocketTransportAddress("localhost", 9300) );
+ .build();
- //use this client for quick connectivity
- Node node = NodeBuilder.nodeBuilder().settings(settings)
- .client(true).node();
+ log.debug( "Creating ElasticSearch client with settings: " + settings.getAsMap() );
- newClient = node.client();
+ // use this client when connecting via socket only,
+ // such as ssh tunnel or other firewall issues
+ // newClient = new TransportClient(settings).addTransportAddress(
+ // new InetSocketTransportAddress("localhost", 9300) );
- }
- client = newClient;
- }
- return client;
+ //use this client for quick connectivity
+ node = NodeBuilder.nodeBuilder().settings( settings ).client( true ).node();
+ client = node.client();
}
/**
- * Uses a project.properties file that Maven does substitution on to to replace the value of a
- * property with the path to the Maven build directory (a.k.a. target). It then uses this path
- * to generate a random String which it uses to append a path component to so a unique directory
- * is selected. If already present it's deleted, then the directory is created.
+ * Uses a project.properties file that Maven does substitution on to to replace the value of a property with the
+ * path to the Maven build directory (a.k.a. target). It then uses this path to generate a random String which it
+ * uses to append a path component to so a unique directory is selected. If already present it's deleted, then the
+ * directory is created.
*
* @return a unique temporary directory
*
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4b972ead/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/FailureMonitor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/FailureMonitor.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/FailureMonitor.java
new file mode 100644
index 0000000..b351699
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/FailureMonitor.java
@@ -0,0 +1,41 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing,
+ * * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * * KIND, either express or implied. See the License for the
+ * * specific language governing permissions and limitations
+ * * under the License.
+ *
+ */
+
+package org.apache.usergrid.persistence.index.impl;
+
+
+/**
+ * Monitor for when exceptions occur during communications
+ */
+public interface FailureMonitor {
+
+ /**
+ * Receive the message that the call failed
+ * @param message
+ * @param throwable
+ */
+ public void fail(final String message, final Throwable throwable);
+
+ /**
+ * The call was successful.
+ */
+ public void success();
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4b972ead/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/FailureMonitorImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/FailureMonitorImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/FailureMonitorImpl.java
new file mode 100644
index 0000000..eb17fac
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/FailureMonitorImpl.java
@@ -0,0 +1,101 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing,
+ * * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * * KIND, either express or implied. See the License for the
+ * * specific language governing permissions and limitations
+ * * under the License.
+ *
+ */
+
+package org.apache.usergrid.persistence.index.impl;
+
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.elasticsearch.cluster.block.ClusterBlockException;
+import org.elasticsearch.transport.TransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.index.IndexFig;
+
+
+/**
+ * Monitors failures
+ */
+public class FailureMonitorImpl implements FailureMonitor {
+
+ private static final Logger LOG = LoggerFactory.getLogger( FailureMonitorImpl.class );
+
+ /**
+ * Exceptions that will cause us to increment our count and restart
+ */
+ private static final Class[] RESTART_EXCEPTIONS =
+ new Class[] { TransportException.class, ClusterBlockException.class };
+
+ /**
+ * Number of consecutive failures when connecting to Elastic Search
+ */
+ private AtomicInteger failCounter = new AtomicInteger();
+
+ private final IndexFig indexFig;
+ private final EsProvider esProvider;
+
+
+ public FailureMonitorImpl( final IndexFig indexFig, final EsProvider esProvider ) {
+ this.indexFig = indexFig;
+ this.esProvider = esProvider;
+ }
+
+
+ @Override
+ public void fail( final String message, final Throwable throwable ) {
+
+ /**
+ * Not a network exception we support restart clients on, abort
+ */
+ if ( !isNetworkException( throwable ) ) {
+ return;
+ }
+
+ final int fails = failCounter.incrementAndGet();
+ final int maxCount = indexFig.getFailRefreshCount();
+
+ if ( fails > maxCount ) {
+ LOG.error( "Unable to connect to elasticsearch. Reason is {}", message, throwable );
+ LOG.warn( "We have failed to connect to Elastic Search {} times. Max allowed is {}. Resetting connection",
+ fails, maxCount );
+
+ esProvider.releaseClient();
+ }
+ }
+
+
+ private boolean isNetworkException( final Throwable throwable ) {
+ for ( Class<?> clazz : RESTART_EXCEPTIONS ) {
+ if ( clazz.isInstance( throwable ) ) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+
+ @Override
+ public void success() {
+ failCounter.set( 0 );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4b972ead/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/ElasticSearchRule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/ElasticSearchRule.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/ElasticSearchRule.java
index 6137d44..f1d1678 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/ElasticSearchRule.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/ElasticSearchRule.java
@@ -43,7 +43,7 @@ public class ElasticSearchRule extends EnvironResource {
if ( client == null ) {
Injector injector = Guice.createInjector( new GuicyFigModule( IndexFig.class ) );
IndexFig indexFig = injector.getInstance( IndexFig.class );
- client = EsProvider.getClient( indexFig );
+ client = new EsProvider(indexFig ).getClient( );
}
return client;
}