You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/03/17 00:37:35 UTC

[07/50] incubator-usergrid git commit: Added buffer wiring to guice and updated the tests.

Added buffer wiring to guice and updated the tests.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/cd0015d5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/cd0015d5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/cd0015d5

Branch: refs/heads/two-dot-o
Commit: cd0015d5af1371528943e015022be21b2d8099f9
Parents: c5a4767
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Mar 10 14:56:47 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Mar 10 14:56:47 2015 -0600

----------------------------------------------------------------------
 .../usergrid/persistence/index/IndexFig.java    |  9 ++++++++
 .../persistence/index/guice/IndexModule.java    | 10 ++++++++-
 .../persistence/index/impl/BufferQueue.java     |  7 ++++++
 .../index/impl/BufferQueueInMemory.java         | 18 ++++++++-------
 .../index/impl/EsIndexBufferConsumerImpl.java   | 23 +++++++++++++++-----
 .../index/impl/EsIndexBufferProducerImpl.java   |  2 +-
 .../index/guice/TestIndexModule.java            |  9 +++++++-
 7 files changed, 62 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd0015d5/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
index c6f08f6..befbaa9 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
@@ -64,6 +64,11 @@ public interface IndexFig extends GuicyFig {
      */
     public static final String ELASTICSEARCH_FAIL_REFRESH = "elasticsearch.fail_refresh";
 
+    /**
+     *  Amount of time in milliseconds to wait when ES rejects our request before retrying.  Provides simple backpressure
+     */
+    public static final String FAILURE_REJECTED_RETRY_WAIT_TIME =  "elasticsearch.rejected_retry_wait";
+
     public static final String QUERY_LIMIT_DEFAULT = "index.query.limit.default";
 
     @Default( "127.0.0.1" )
@@ -158,4 +163,8 @@ public interface IndexFig extends GuicyFig {
     @Default("one")
     @Key( INDEX_WRITE_CONSISTENCY_LEVEL )
     String getWriteConsistencyLevel();
+
+    @Default("1000")
+    @Key( FAILURE_REJECTED_RETRY_WAIT_TIME )
+    long getFailureRetryTime();
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd0015d5/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
index ebd9098..d911dab 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
@@ -32,7 +32,7 @@ import org.apache.usergrid.persistence.map.guice.MapModule;
 import org.safehaus.guicyfig.GuicyFigModule;
 
 
-public class IndexModule extends AbstractModule {
+public abstract class IndexModule extends AbstractModule {
 
     @Override
     protected void configure() {
@@ -48,6 +48,14 @@ public class IndexModule extends AbstractModule {
         bind(IndexBufferProducer.class).to(EsIndexBufferProducerImpl.class);
         bind(IndexBufferConsumer.class).to(EsIndexBufferConsumerImpl.class).asEagerSingleton();
 
+        wireBufferQueue();
     }
 
+
+    /**
+     * Write the <class>BufferQueue</class> for this implementation
+     */
+    public abstract void wireBufferQueue();
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd0015d5/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueue.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueue.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueue.java
index dec6ac3..ffc3b90 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueue.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueue.java
@@ -46,4 +46,11 @@ public interface BufferQueue {
      * @return
      */
     public List<IndexOperationMessage> take(final int takeSize, final long timeout, final TimeUnit timeUnit );
+
+
+    /**
+     * Ack all messages so they do not appear again.  Meant for transactional queues, and may or may not be implemented
+     * @param messages
+     */
+    public void ack(List<IndexOperationMessage> messages);
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd0015d5/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemory.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemory.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemory.java
index 502f45d..403762f 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemory.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemory.java
@@ -36,15 +36,11 @@ import com.google.inject.Singleton;
 public class BufferQueueInMemory implements BufferQueue {
 
     private final ArrayBlockingQueue<IndexOperationMessage> messages;
-    private final IndexFig fig;
 
 
     @Inject
-    public BufferQueueInMemory( final ArrayBlockingQueue<IndexOperationMessage> messages, final IndexFig fig ) {
-        this.messages = messages;
-
-
-        this.fig = fig;
+    public BufferQueueInMemory(final IndexFig fig ) {
+        messages = new ArrayBlockingQueue<>( fig.getIndexQueueSize() );
     }
 
 
@@ -78,8 +74,14 @@ public class BufferQueueInMemory implements BufferQueue {
 
             }
         }
-        while ( response.size() < takeSize &&  System.currentTimeMillis() < endTime );
+        while ( response.size() < takeSize && System.currentTimeMillis() < endTime );
+
+        return response;
+    }
+
 
-        return null;
+    @Override
+    public void ack( final List<IndexOperationMessage> messages ) {
+         //no op for this
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd0015d5/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
index 19b8438..45c12a1 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
@@ -39,6 +39,7 @@ import org.elasticsearch.action.deletebyquery.DeleteByQueryRequestBuilder;
 import org.elasticsearch.action.index.IndexRequestBuilder;
 import org.elasticsearch.action.support.replication.ShardReplicationOperationRequestBuilder;
 import org.elasticsearch.client.Client;
+import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import rx.Observable;
@@ -70,9 +71,8 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
     private final BufferQueue bufferQueue;
 
     @Inject
-    public EsIndexBufferConsumerImpl( final IndexFig config, final IndexBufferProducer producer, final EsProvider
-        provider, final MetricsFactory metricsFactory,
-                                      final BufferQueue bufferQueue ){
+    public EsIndexBufferConsumerImpl( final IndexFig config,  final EsProvider
+        provider, final MetricsFactory metricsFactory,   final BufferQueue bufferQueue ){
         this.bufferQueue = bufferQueue;
         this.flushTimer = metricsFactory.getTimer(EsIndexBufferConsumerImpl.class, "index.buffer.flush");
         this.flushMeter = metricsFactory.getMeter(EsIndexBufferConsumerImpl.class, "index.buffer.meter");
@@ -101,17 +101,30 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
                         for ( IndexOperationMessage drained : drainList ) {
                             subscriber.onNext( drained );
                         }
-                        drainList.clear();
+
+                        bufferQueue.ack( drainList );
+
                         timer.stop();
 
                         countFail.set( 0 );
                     }
+                    catch( EsRejectedExecutionException err)  {
+                        countFail.incrementAndGet();
+                        log.error( "Elasticsearch rejected our request, sleeping for {} milliseconds before retrying.  Failed {} consecutive times", config.getFailRefreshCount(), countFail.get() );
+
+                       //es  rejected the exception, sleep and retry in the queue
+                        try {
+                            Thread.sleep( config.getFailureRetryTime() );
+                        }
+                        catch ( InterruptedException e ) {
+                            //swallow
+                        }
+                    }
                     catch ( Exception e ) {
                         int count = countFail.incrementAndGet();
                         log.error( "failed to dequeue", e );
                         if ( count > 200 ) {
                             log.error( "Shutting down index drain due to repetitive failures" );
-                            //break;
                         }
                     }
                 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd0015d5/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java
index f9999b2..db1f50e 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java
@@ -46,7 +46,7 @@ public class EsIndexBufferProducerImpl implements IndexBufferProducer {
     private final BufferQueue bufferQueue;
 
     @Inject
-    public EsIndexBufferProducerImpl( MetricsFactory metricsFactory, IndexFig fig, final BufferQueue bufferQueue ){
+    public EsIndexBufferProducerImpl( MetricsFactory metricsFactory, final BufferQueue bufferQueue ){
         this.bufferQueue = bufferQueue;
         this.indexSizeCounter = metricsFactory.getCounter(EsIndexBufferProducerImpl.class, "index.buffer.size");
         this.timer =  metricsFactory.getTimer(EsIndexBufferProducerImpl.class,"index.buffer.producer.timer");

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd0015d5/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
index 4d68dda..7e2312d 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
@@ -22,6 +22,8 @@ package org.apache.usergrid.persistence.index.guice;
 import org.apache.usergrid.persistence.collection.guice.CollectionModule;
 import org.apache.usergrid.persistence.core.guice.TestModule;
 import org.apache.usergrid.persistence.core.guice.CommonModule;
+import org.apache.usergrid.persistence.index.impl.BufferQueue;
+import org.apache.usergrid.persistence.index.impl.BufferQueueInMemory;
 
 
 public class TestIndexModule extends TestModule {
@@ -32,6 +34,11 @@ public class TestIndexModule extends TestModule {
 
         // configure collections and our core astyanax framework
         install( new CollectionModule() );
-        install( new IndexModule() );
+        install( new IndexModule() {
+            @Override
+            public void wireBufferQueue() {
+                bind( BufferQueue.class).to( BufferQueueInMemory.class );
+            }
+        } );
     }
 }