You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2014/10/30 20:58:53 UTC

git commit: Added automatic restart to ES client. We seem to get disconnected with large clusters occasionally.

Repository: incubator-usergrid
Updated Branches:
  refs/heads/key-row-sharding 599d9e73c -> 4b972ead9


Added automatic restart to ES client.  We seem to get disconnected with large clusters occasionally.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/4b972ead
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/4b972ead
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/4b972ead

Branch: refs/heads/key-row-sharding
Commit: 4b972ead90ebfcb7d8c9bb964aa332f54a12e0fd
Parents: 599d9e7
Author: Todd Nine <tn...@apigee.com>
Authored: Thu Oct 30 13:58:50 2014 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Thu Oct 30 13:58:50 2014 -0600

----------------------------------------------------------------------
 .../corepersistence/CpManagerCache.java         |  12 +-
 .../usergrid/persistence/index/IndexFig.java    |  10 +
 .../index/impl/EsEntityIndexBatchImpl.java      |  23 ++-
 .../index/impl/EsEntityIndexImpl.java           | 105 ++++++----
 .../persistence/index/impl/EsProvider.java      | 196 ++++++++-----------
 .../persistence/index/impl/FailureMonitor.java  |  41 ++++
 .../index/impl/FailureMonitorImpl.java          | 101 ++++++++++
 .../index/impl/ElasticSearchRule.java           |   2 +-
 8 files changed, 321 insertions(+), 169 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4b972ead/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpManagerCache.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpManagerCache.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpManagerCache.java
index 99bde22..a73dc6e 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpManagerCache.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpManagerCache.java
@@ -44,8 +44,7 @@ public class CpManagerCache {
 
     private LoadingCache<CollectionScope, EntityCollectionManager> ecmCache = 
         CacheBuilder.newBuilder()
-            .maximumSize(100)
-            .expireAfterWrite( 1, TimeUnit.HOURS)
+            .maximumSize(1000)
             .build( new CacheLoader<CollectionScope, EntityCollectionManager>() {
                 public EntityCollectionManager load( CollectionScope scope ) { 
                     return ecmf.createCollectionManager( scope );
@@ -55,8 +54,7 @@ public class CpManagerCache {
 
     private LoadingCache<ApplicationScope, EntityIndex> eiCache = 
         CacheBuilder.newBuilder()
-            .maximumSize(100)
-            .expireAfterWrite( 1, TimeUnit.HOURS )
+            .maximumSize(1000)
             .build( new CacheLoader<ApplicationScope, EntityIndex>() {
                 public EntityIndex load( ApplicationScope scope ) { 
                     return eif.createEntityIndex( scope );
@@ -66,8 +64,7 @@ public class CpManagerCache {
 
     private LoadingCache<ApplicationScope, GraphManager> gmCache = 
         CacheBuilder.newBuilder()
-            .maximumSize(100)
-            .expireAfterWrite( 1, TimeUnit.HOURS )
+            .maximumSize(1000)
             .build( new CacheLoader<ApplicationScope, GraphManager>() {
                 public GraphManager load( ApplicationScope scope ) { 
                     return gmf.createEdgeManager( scope );
@@ -77,8 +74,7 @@ public class CpManagerCache {
 
     private LoadingCache<MapScope, MapManager> mmCache = 
         CacheBuilder.newBuilder()
-            .maximumSize(100)
-            .expireAfterWrite( 1, TimeUnit.HOURS )
+            .maximumSize(1000)
             .build( new CacheLoader<MapScope, MapManager>() {
                 public MapManager load( MapScope scope ) { 
                     return mmf.createMapManager( scope );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4b972ead/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
index 142c48a..e10ce44 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
@@ -43,6 +43,12 @@ public interface IndexFig extends GuicyFig {
 
     public static final String ELASTICSEARCH_FORCE_REFRESH = "elasticsearch.force_refresh";
 
+
+    /**
+     * the number of times we can fail before we refresh the client
+     */
+    public static final String ELASTICSEARCH_FAIL_REFRESH = "elasticsearch.fail_refresh";
+
     public static final String QUERY_LIMIT_DEFAULT = "index.query.limit.default";
     
     @Default( "127.0.0.1" )
@@ -82,4 +88,8 @@ public interface IndexFig extends GuicyFig {
     @Default("default")
     @Key( ELASTICSEARCH_NODENAME )
     public String getNodeName();
+
+    @Default( "20" )
+    @Key( ELASTICSEARCH_FAIL_REFRESH )
+    int getFailRefreshCount();
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4b972ead/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
index fab135a..ad58fa8 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
@@ -88,15 +88,15 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
 
     private int count;
 
+    private final FailureMonitor failureMonitor;
 
-    public EsEntityIndexBatchImpl( 
-            final ApplicationScope applicationScope, 
-            final Client client, 
-            final IndexFig config,
-            final int autoFlushSize ) {
+
+    public EsEntityIndexBatchImpl( final ApplicationScope applicationScope, final Client client, final IndexFig config,
+                                   final int autoFlushSize, final FailureMonitor failureMonitor ) {
 
         this.applicationScope = applicationScope;
         this.client = client;
+        this.failureMonitor = failureMonitor;
         this.indexName = createIndexName( config.getIndexPrefix(), applicationScope );
         this.refresh = config.isForcedRefresh();
         this.autoFlushSize = autoFlushSize;
@@ -208,7 +208,18 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
             return;
         }
 
-        final BulkResponse responses = request.execute().actionGet();
+        final BulkResponse responses;
+
+        try {
+            responses = request.execute().actionGet();
+        }catch(Throwable t){
+            log.error( "Unable to communicate with elasticsearch" );
+            failureMonitor.fail( "Unable to execute batch", t );
+            throw t;
+        }
+
+
+        failureMonitor.success();
 
         for ( BulkItemResponse response : responses ) {
             if ( response.isFailed() ) {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4b972ead/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index fc09b5a..f8e73c2 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@ -81,16 +81,22 @@ public class EsEntityIndexImpl implements EntityIndex {
 
     private static final AtomicBoolean mappingsCreated = new AtomicBoolean( false );
 
+    /**
+     * We purposefully make this per instance. Some indexes may work, while others may fail
+     */
+    private FailureMonitor failureMonitor;
+
     private final String indexName;
 
     private final ApplicationScope applicationScope;
 
-    private final Client client;
+    private final EsProvider esProvider;
 
     private final int cursorTimeout;
 
     private final IndexFig config;
 
+
     //number of times to wait for the index to refresh properly.
     private static final int MAX_WAITS = 10;
     //number of milliseconds to try again before sleeping
@@ -111,10 +117,11 @@ public class EsEntityIndexImpl implements EntityIndex {
         ValidationUtils.validateApplicationScope( appScope );
 
         this.applicationScope = appScope;
-        this.client = provider.getClient();
+        this.esProvider = provider;
         this.config = config;
         this.cursorTimeout = config.getQueryCursorTimeout();
         this.indexName = IndexingUtils.createIndexName( config.getIndexPrefix(), appScope );
+        this.failureMonitor = new FailureMonitorImpl( config, provider );
     }
 
 
@@ -126,7 +133,7 @@ public class EsEntityIndexImpl implements EntityIndex {
                 createMappings();
             }
 
-            AdminClient admin = client.admin();
+            AdminClient admin = esProvider.getClient().admin();
             CreateIndexResponse cir = admin.indices().prepareCreate( indexName ).execute().actionGet();
             log.info( "Created new Index Name [{}] ACK=[{}]", indexName, cir.isAcknowledged() );
 
@@ -134,7 +141,8 @@ public class EsEntityIndexImpl implements EntityIndex {
 
             // Immediately create a document and remove it to ensure the entire cluster is ready 
             // to receive documents. Occasionally we see errors.  See this post:
-            // http://elasticsearch-users.115913.n3.nabble.com/IndexMissingException-on-create-index-followed-by-refresh-td1832793.html
+            // http://elasticsearch-users.115913.n3.nabble
+            // .com/IndexMissingException-on-create-index-followed-by-refresh-td1832793.html
 
             testNewIndex();
         }
@@ -164,18 +172,16 @@ public class EsEntityIndexImpl implements EntityIndex {
             public boolean doOp() {
                 final String tempId = UUIDGenerator.newTimeUUID().toString();
 
-                client.prepareIndex( indexName, VERIFY_TYPE, tempId )
-                        .setSource( DEFAULT_PAYLOAD ).get();
+                esProvider.getClient().prepareIndex( indexName, VERIFY_TYPE, tempId ).setSource( DEFAULT_PAYLOAD ).get();
 
-                log.info( "Successfully created new document with docId {} in index {} and type {}", 
-                        tempId, indexName, VERIFY_TYPE );
+                log.info( "Successfully created new document with docId {} in index {} and type {}", tempId, indexName,
+                        VERIFY_TYPE );
 
                 // delete all types, this way if we miss one it will get cleaned up
-                client.prepareDeleteByQuery( indexName ).setTypes( VERIFY_TYPE )
-                        .setQuery( MATCH_ALL_QUERY_BUILDER ).get();
+                esProvider.getClient().prepareDeleteByQuery( indexName ).setTypes( VERIFY_TYPE ).setQuery( MATCH_ALL_QUERY_BUILDER )
+                      .get();
 
-                log.info( "Successfully deleted all documents in index {} and type {}", 
-                        indexName, VERIFY_TYPE );
+                log.info( "Successfully deleted all documents in index {} and type {}", indexName, VERIFY_TYPE );
 
                 return true;
             }
@@ -186,25 +192,24 @@ public class EsEntityIndexImpl implements EntityIndex {
 
 
     /**
-     * Setup ElasticSearch type mappings as a template that applies to all new indexes. 
-     * Applies to all indexes that start with our prefix.
+     * Setup ElasticSearch type mappings as a template that applies to all new indexes. Applies to all indexes that
+     * start with our prefix.
      */
     private void createMappings() throws IOException {
 
-        XContentBuilder xcb = IndexingUtils
-                .createDoubleStringIndexMapping( XContentFactory.jsonBuilder(), "_default_" );
+        XContentBuilder xcb =
+                IndexingUtils.createDoubleStringIndexMapping( XContentFactory.jsonBuilder(), "_default_" );
 
-        PutIndexTemplateResponse pitr = client.admin().indices()
-            .preparePutTemplate( "usergrid_template" )
-            .setTemplate( config.getIndexPrefix() + "*" )
-            .addMapping( "_default_", xcb ) // set mapping as the default for all types
-            .execute().actionGet();
+        PutIndexTemplateResponse pitr = esProvider.getClient().admin().indices().preparePutTemplate( "usergrid_template" )
+                                              .setTemplate( config.getIndexPrefix() + "*" ).addMapping( "_default_",
+                        xcb ) // set mapping as the default for all types
+                .execute().actionGet();
     }
 
 
     @Override
     public EntityIndexBatch createBatch() {
-        return new EsEntityIndexBatchImpl( applicationScope, client, config, 1000 );
+        return new EsEntityIndexBatchImpl( applicationScope, esProvider.getClient(), config, 1000, failureMonitor );
     }
 
 
@@ -224,8 +229,9 @@ public class EsEntityIndexImpl implements EntityIndex {
         SearchResponse searchResponse;
         if ( query.getCursor() == null ) {
 
-            SearchRequestBuilder srb = client.prepareSearch( indexName )
-                    .setTypes( indexType ).setScroll( cursorTimeout + "m" ) .setQuery( qb );
+            SearchRequestBuilder srb =
+                    esProvider.getClient().prepareSearch( indexName ).setTypes( indexType ).setScroll( cursorTimeout + "m" )
+                          .setQuery( qb );
 
             FilterBuilder fb = query.createFilterBuilder();
             if ( fb != null ) {
@@ -251,25 +257,35 @@ public class EsEntityIndexImpl implements EntityIndex {
                 // to ignore any fields that are not present.
 
                 final String stringFieldName = STRING_PREFIX + sp.getPropertyName();
-                final FieldSortBuilder stringSort = SortBuilders.fieldSort( stringFieldName )
-                        .order( order ).ignoreUnmapped( true );
+                final FieldSortBuilder stringSort =
+                        SortBuilders.fieldSort( stringFieldName ).order( order ).ignoreUnmapped( true );
                 srb.addSort( stringSort );
                 log.debug( "   Sort: {} order by {}", stringFieldName, order.toString() );
 
                 final String numberFieldName = NUMBER_PREFIX + sp.getPropertyName();
-                final FieldSortBuilder numberSort = SortBuilders.fieldSort( numberFieldName )
-                        .order( order ).ignoreUnmapped( true );
+                final FieldSortBuilder numberSort =
+                        SortBuilders.fieldSort( numberFieldName ).order( order ).ignoreUnmapped( true );
                 srb.addSort( numberSort );
                 log.debug( "   Sort: {} order by {}", numberFieldName, order.toString() );
 
                 final String booleanFieldName = BOOLEAN_PREFIX + sp.getPropertyName();
-                final FieldSortBuilder booleanSort = SortBuilders.fieldSort( booleanFieldName )
-                        .order( order ).ignoreUnmapped( true );
+                final FieldSortBuilder booleanSort =
+                        SortBuilders.fieldSort( booleanFieldName ).order( order ).ignoreUnmapped( true );
                 srb.addSort( booleanSort );
                 log.debug( "   Sort: {} order by {}", booleanFieldName, order.toString() );
             }
 
-            searchResponse = srb.execute().actionGet();
+            try {
+                searchResponse = srb.execute().actionGet();
+            }
+            catch ( Throwable t ) {
+                log.error( "Unable to communicate with elasticsearch" );
+                failureMonitor.fail( "Unable to execute batch", t );
+                throw t;
+            }
+
+
+            failureMonitor.success();
         }
         else {
             String scrollId = query.getCursor();
@@ -281,9 +297,19 @@ public class EsEntityIndexImpl implements EntityIndex {
             }
             log.debug( "Executing query with cursor: {} ", scrollId );
 
-            SearchScrollRequestBuilder ssrb = client.prepareSearchScroll( scrollId )
-                    .setScroll( cursorTimeout + "m" );
-            searchResponse = ssrb.execute().actionGet();
+            SearchScrollRequestBuilder ssrb = esProvider.getClient().prepareSearchScroll( scrollId ).setScroll( cursorTimeout + "m" );
+
+            try {
+                searchResponse = ssrb.execute().actionGet();
+            }
+            catch ( Throwable t ) {
+                log.error( "Unable to communicate with elasticsearch" );
+                failureMonitor.fail( "Unable to execute batch", t );
+                throw t;
+            }
+
+
+            failureMonitor.success();
         }
 
         SearchHits hits = searchResponse.getHits();
@@ -323,12 +349,12 @@ public class EsEntityIndexImpl implements EntityIndex {
             @Override
             public boolean doOp() {
                 try {
-                    client.admin().indices().prepareRefresh( indexName ).execute().actionGet();
+                    esProvider.getClient().admin().indices().prepareRefresh( indexName ).execute().actionGet();
                     log.debug( "Refreshed index: " + indexName );
                     return true;
                 }
                 catch ( IndexMissingException e ) {
-                    log.error( "Unable to refresh index after create. Waiting before sleeping.", e);
+                    log.error( "Unable to refresh index after create. Waiting before sleeping.", e );
                     throw e;
                 }
             }
@@ -353,7 +379,7 @@ public class EsEntityIndexImpl implements EntityIndex {
      * For testing only.
      */
     public void deleteIndex() {
-        AdminClient adminClient = client.admin();
+        AdminClient adminClient = esProvider.getClient().admin();
         DeleteIndexResponse response = adminClient.indices().prepareDelete( indexName ).get();
         if ( response.isAcknowledged() ) {
             log.info( "Deleted index: " + indexName );
@@ -366,13 +392,12 @@ public class EsEntityIndexImpl implements EntityIndex {
 
     /**
      * Do the retry operation
-     * @param operation
      */
     private void doInRetry( final RetryOperation operation ) {
         for ( int i = 0; i < MAX_WAITS; i++ ) {
 
             try {
-                if(operation.doOp()){
+                if ( operation.doOp() ) {
                     return;
                 }
             }
@@ -401,4 +426,6 @@ public class EsEntityIndexImpl implements EntityIndex {
          */
         public boolean doOp();
     }
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4b972ead/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsProvider.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsProvider.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsProvider.java
index efc61a9..eb99e3e 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsProvider.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsProvider.java
@@ -18,17 +18,13 @@
  */
 package org.apache.usergrid.persistence.index.impl;
 
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
+
 import java.io.File;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.Properties;
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang.RandomStringUtils;
-import org.apache.usergrid.persistence.core.util.AvailablePortFinder;
-import org.apache.usergrid.persistence.index.IndexFig;
+
 import org.elasticsearch.client.Client;
 import org.elasticsearch.common.settings.ImmutableSettings;
 import org.elasticsearch.common.settings.Settings;
@@ -37,6 +33,14 @@ import org.elasticsearch.node.NodeBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.RandomStringUtils;
+
+import org.apache.usergrid.persistence.index.IndexFig;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
 
 /**
  * Provides access to ElasticSearch client and, optionally, embedded ElasticSearch for testing.
@@ -44,154 +48,116 @@ import org.slf4j.LoggerFactory;
 @Singleton
 public class EsProvider {
 
-    private static final Logger log = LoggerFactory.getLogger(EsProvider.class);
+    private static final Logger log = LoggerFactory.getLogger( EsProvider.class );
 
     private final IndexFig indexFig;
     private static Client client;
+    private static Node node;
 
     public static String LOCAL_ES_PORT_PROPNAME = "EMBEDDED_ES_PORT";
 
 
     @Inject
-    public EsProvider(IndexFig fig) {
+    public EsProvider( IndexFig fig ) {
         this.indexFig = fig;
     }
 
-    public synchronized Client getClient() {
-        if (client == null) {
-            client = getClient(indexFig);
+
+    /**
+     * Get the client instnace
+     */
+    public Client getClient() {
+        if ( client == null ) {
+            //synchronize on creating the client so we don't create too many
+            createClient( indexFig );
         }
         return client;
     }
 
-    
+
+    /**
+     * Reset the client instnace
+     */
     public void releaseClient() {
-        client = null;
+        //reset our static variables
+        if ( client != null && node != null ) {
+            node.stop();
+            node = null;
+            client = null;
+        }
     }
 
 
-    public static synchronized Client getClient(IndexFig fig) {
-
-        if (client == null) {
-
-            Client newClient = null;
-
-            if ("embedded".equals( fig.getStartUp()) ) {
+    private synchronized void createClient( IndexFig fig ) {
 
-                int port = AvailablePortFinder.getNextAvailable(2000);
 
-                System.setProperty( LOCAL_ES_PORT_PROPNAME, port+"" );
-
-                File tempDir;
-                try {
-                    tempDir = getTempDirectory();
-                } catch (Exception ex) {
-                    throw new RuntimeException(
-                        "Fatal error unable to create temp dir, start embedded ElasticSearch", ex);
-                }
-
-                Settings settings = ImmutableSettings.settingsBuilder()
-
-                    .put("cluster.name", fig.getClusterName())
-
-                    .put("network.publish_host","127.0.0.1")
-                    .put("transport.tcp.port", port)
-                    .put("discovery.zen.ping.multicast.enabled","false")
-                    .put("node.http.enabled", false)
-
-                    .put("path.logs", tempDir.toString())
-                    .put("path.data", tempDir.toString())
-
-                    .put("gateway.type", "none")
-                    .put("index.store.type", "memory")
-                    .put("index.number_of_shards", 1)
-                    .put("index.number_of_replicas", 1)
-                    .build();
-
-                log.info("-----------------------------------------------------------------------");
-                log.info("Starting ElasticSearch embedded server settings: \n"+settings.getAsMap());
-                log.info("-----------------------------------------------------------------------");
-
-                Node node = NodeBuilder.nodeBuilder().settings(settings)
-                    .clusterName( fig.getClusterName() ).node();
-
-                newClient = node.client();
-
-
-            } else {
-                
-                String allHosts = "";
-
-                if ("remote".equals( fig.getStartUp()) ) { 
-
-                    // we will connect to ES on all configured hosts
-                    String SEP = "";
-                    for (String host : fig.getHosts().split(",")) {
-                        allHosts = allHosts + SEP + host + ":" + fig.getPort();
-                        SEP = ",";
-                    }
+        if ( client != null && node != null) {
+            return;
+        }
 
-                } else {
 
-                    // we will connect to forked ES on localhost
-                    allHosts = "localhost:" + System.getProperty(LOCAL_ES_PORT_PROPNAME);
-                }
+        String allHosts = "";
 
-                String nodeName = fig.getNodeName();
-                if ( "default".equals( nodeName )) {
-                    // no nodeName was specified, use hostname
-                    try {
-                        nodeName = InetAddress.getLocalHost().getHostName();
+        if ( "remote".equals( fig.getStartUp() ) ) {
 
-                    } catch (UnknownHostException ex) {
-                        nodeName = "client-" + RandomStringUtils.randomAlphabetic(8);
-                        log.warn("Couldn't get hostname to use as ES node name, using " + nodeName);
-                    }
-                }
+            // we will connect to ES on all configured hosts
+            String SEP = "";
+            for ( String host : fig.getHosts().split( "," ) ) {
+                allHosts = allHosts + SEP + host + ":" + fig.getPort();
+                SEP = ",";
+            }
+        }
+        else {
 
-                Settings settings = ImmutableSettings.settingsBuilder()
+            // we will connect to forked ES on localhost
+            allHosts = "localhost:" + System.getProperty( LOCAL_ES_PORT_PROPNAME );
+        }
 
-                    .put( "cluster.name", fig.getClusterName() )
+        String nodeName = fig.getNodeName();
+        if ( "default".equals( nodeName ) ) {
+            // no nodeName was specified, use hostname
+            try {
+                nodeName = InetAddress.getLocalHost().getHostName();
+            }
+            catch ( UnknownHostException ex ) {
+                nodeName = "client-" + RandomStringUtils.randomAlphabetic( 8 );
+                log.warn( "Couldn't get hostname to use as ES node name, using " + nodeName );
+            }
+        }
 
-                    // this assumes that we're using zen for host discovery.  Putting an 
-                    // explicit set of bootstrap hosts ensures we connect to a valid cluster.
-                    .put( "discovery.zen.ping.unicast.hosts", allHosts )
-                    .put( "discovery.zen.ping.multicast.enabled", "false" )
-                    .put("http.enabled", false) 
+        Settings settings = ImmutableSettings.settingsBuilder()
 
-                    .put( "client.transport.ping_timeout", 2000 ) // milliseconds
-                    .put( "client.transport.nodes_sampler_interval", 100 )
-                    .put( "network.tcp.blocking", true )
-                    .put( "node.client", true )
-                    .put( "node.name", nodeName )
+                .put( "cluster.name", fig.getClusterName() )
 
-                    .build();
+                        // this assumes that we're using zen for host discovery.  Putting an
+                        // explicit set of bootstrap hosts ensures we connect to a valid cluster.
+                .put( "discovery.zen.ping.unicast.hosts", allHosts )
+                .put( "discovery.zen.ping.multicast.enabled", "false" ).put( "http.enabled", false )
 
-                log.debug("Creating ElasticSearch client with settings: " +  settings.getAsMap());
+                .put( "client.transport.ping_timeout", 2000 ) // milliseconds
+                .put( "client.transport.nodes_sampler_interval", 100 ).put( "network.tcp.blocking", true )
+                .put( "node.client", true ).put( "node.name", nodeName )
 
-                // use this client when connecting via socket only, 
-                // such as ssh tunnel or other firewall issues
-                // newClient  = new TransportClient(settings).addTransportAddress( 
-                //                  new InetSocketTransportAddress("localhost", 9300) );
+                .build();
 
-                //use this client for quick connectivity
-                Node node = NodeBuilder.nodeBuilder().settings(settings)
-                    .client(true).node();
+        log.debug( "Creating ElasticSearch client with settings: " + settings.getAsMap() );
 
-                newClient = node.client();
+        // use this client when connecting via socket only,
+        // such as ssh tunnel or other firewall issues
+        // newClient  = new TransportClient(settings).addTransportAddress(
+        //                  new InetSocketTransportAddress("localhost", 9300) );
 
-            }
-                client = newClient;
-        }
-        return client;
+        //use this client for quick connectivity
+        node = NodeBuilder.nodeBuilder().settings( settings ).client( true ).node();
+        client = node.client();
     }
 
 
     /**
-     * Uses a project.properties file that Maven does substitution on to to replace the value of a 
-     * property with the path to the Maven build directory (a.k.a. target). It then uses this path 
-     * to generate a random String which it uses to append a path component to so a unique directory 
-     * is selected. If already present it's deleted, then the directory is created.
+     * Uses a project.properties file that Maven does substitution on to to replace the value of a property with the
+     * path to the Maven build directory (a.k.a. target). It then uses this path to generate a random String which it
+     * uses to append a path component to so a unique directory is selected. If already present it's deleted, then the
+     * directory is created.
      *
      * @return a unique temporary directory
      *

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4b972ead/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/FailureMonitor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/FailureMonitor.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/FailureMonitor.java
new file mode 100644
index 0000000..b351699
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/FailureMonitor.java
@@ -0,0 +1,41 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+
+package org.apache.usergrid.persistence.index.impl;
+
+
+/**
+ * Monitor for when exceptions occur during communications
+ */
+public interface FailureMonitor {
+
+    /**
+     * Receive the message that the call failed
+     * @param message
+     * @param throwable
+     */
+    public void fail(final String message, final Throwable throwable);
+
+    /**
+     * The call was successful.
+     */
+    public void success();
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4b972ead/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/FailureMonitorImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/FailureMonitorImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/FailureMonitorImpl.java
new file mode 100644
index 0000000..eb17fac
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/FailureMonitorImpl.java
@@ -0,0 +1,101 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+
+package org.apache.usergrid.persistence.index.impl;
+
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.elasticsearch.cluster.block.ClusterBlockException;
+import org.elasticsearch.transport.TransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.index.IndexFig;
+
+
+/**
+ * Monitors failures
+ */
+public class FailureMonitorImpl implements FailureMonitor {
+
+    private static final Logger LOG = LoggerFactory.getLogger( FailureMonitorImpl.class );
+
+    /**
+     * Exceptions that will cause us to increment our count and restart
+     */
+    private static final Class[] RESTART_EXCEPTIONS =
+            new Class[] { TransportException.class, ClusterBlockException.class };
+
+    /**
+     * Number of consecutive failures when connecting to Elastic Search
+     */
+    private AtomicInteger failCounter = new AtomicInteger();
+
+    private final IndexFig indexFig;
+    private final EsProvider esProvider;
+
+
+    public FailureMonitorImpl( final IndexFig indexFig, final EsProvider esProvider ) {
+        this.indexFig = indexFig;
+        this.esProvider = esProvider;
+    }
+
+
+    @Override
+    public void fail( final String message, final Throwable throwable ) {
+
+        /**
+         * Not a network exception we support restart clients on, abort
+         */
+        if ( !isNetworkException( throwable ) ) {
+            return;
+        }
+
+        final int fails = failCounter.incrementAndGet();
+        final int maxCount = indexFig.getFailRefreshCount();
+
+        if ( fails > maxCount ) {
+            LOG.error( "Unable to connect to elasticsearch.  Reason is {}", message, throwable );
+            LOG.warn( "We have failed to connect to Elastic Search {} times.  Max allowed is {}.  Resetting connection",
+                    fails, maxCount );
+
+            esProvider.releaseClient();
+        }
+    }
+
+
+    private boolean isNetworkException( final Throwable throwable ) {
+        for ( Class<?> clazz : RESTART_EXCEPTIONS ) {
+            if ( clazz.isInstance( throwable ) ) {
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+
+    @Override
+    public void success() {
+        failCounter.set( 0 );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4b972ead/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/ElasticSearchRule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/ElasticSearchRule.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/ElasticSearchRule.java
index 6137d44..f1d1678 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/ElasticSearchRule.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/ElasticSearchRule.java
@@ -43,7 +43,7 @@ public class ElasticSearchRule extends EnvironResource {
         if ( client == null ) {
             Injector injector = Guice.createInjector( new GuicyFigModule( IndexFig.class ) );
             IndexFig indexFig = injector.getInstance( IndexFig.class );            
-            client = EsProvider.getClient( indexFig );
+            client = new EsProvider(indexFig  ).getClient( );
         }
         return client;
     }