You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/03/17 00:38:01 UTC

[33/50] incubator-usergrid git commit: Added the ability to specify the number of workers

Added the ability to specify the number of workers


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/9aec790b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/9aec790b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/9aec790b

Branch: refs/heads/two-dot-o
Commit: 9aec790b3fd8486ce6dba67125f94a5d912e8596
Parents: d81dfaa
Author: Todd Nine <tn...@apigee.com>
Authored: Wed Mar 11 18:27:16 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Wed Mar 11 18:27:16 2015 -0600

----------------------------------------------------------------------
 .../usergrid/persistence/index/IndexFig.java    |  9 ++++
 .../index/impl/EsIndexBufferConsumerImpl.java   | 55 +++++++++++++++-----
 .../index/guice/TestIndexModule.java            |  4 +-
 3 files changed, 52 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9aec790b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
index cde86fd..445789f 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
@@ -81,6 +81,11 @@ public interface IndexFig extends GuicyFig {
      */
     public static final String FAILURE_REJECTED_RETRY_WAIT_TIME = "elasticsearch.rejected_retry_wait";
 
+    /**
+     * The number of worker threads to consume from the queue
+     */
+    public static final String ELASTICSEARCH_WORKER_COUNT = "elasticsearch.worker_count";
+
     public static final String QUERY_LIMIT_DEFAULT = "index.query.limit.default";
 
     @Default( "127.0.0.1" )
@@ -181,4 +186,8 @@ public interface IndexFig extends GuicyFig {
     @Key( INDEX_QUEUE_READ_TIMEOUT )
     int getIndexQueueTimeout();
 
+    @Default("2")
+    @Key( ELASTICSEARCH_WORKER_COUNT )
+    int getWorkerCount();
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9aec790b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
index 862b1ae..836ec3d 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
@@ -49,9 +49,12 @@ import rx.functions.Func1;
 import rx.functions.Func2;
 import rx.schedulers.Schedulers;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
 
 /**
  * Consumer for IndexOperationMessages
@@ -69,15 +72,18 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
     private final Meter flushMeter;
     private final Timer produceTimer;
     private final BufferQueue bufferQueue;
+    private final IndexFig indexFig;
+    private final AtomicLong counter = new AtomicLong(  );
 
     //the actively running subscription
-    private Subscription subscription;
+    private List<Subscription> subscriptions;
 
     private Object mutex = new Object();
 
     @Inject
-    public EsIndexBufferConsumerImpl( final IndexFig config,  final EsProvider
-        provider, final MetricsFactory metricsFactory,   final BufferQueue bufferQueue ){
+    public EsIndexBufferConsumerImpl( final IndexFig config, final EsProvider provider, final MetricsFactory
+        metricsFactory, final BufferQueue bufferQueue, final IndexFig indexFig ){
+
         this.flushTimer = metricsFactory.getTimer(EsIndexBufferConsumerImpl.class, "index.buffer.flush");
         this.flushMeter = metricsFactory.getMeter(EsIndexBufferConsumerImpl.class, "index.buffer.meter");
         this.indexSizeCounter =  metricsFactory.getCounter(EsIndexBufferConsumerImpl.class, "index.buffer.size");
@@ -86,15 +92,42 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
         this.client = provider.getClient();
         this.produceTimer = metricsFactory.getTimer(EsIndexBufferConsumerImpl.class,"index.buffer.consumer.messageFetch");
         this.bufferQueue = bufferQueue;
+        this.indexFig = indexFig;
 
-
+        subscriptions = new ArrayList<>( indexFig.getWorkerCount() );
 
         //batch up sets of some size and send them in batch
           start();
     }
 
 
+    /**
+     * Loop throught and start the workers
+     */
     public void start() {
+        final int count = indexFig.getWorkerCount();
+
+        for(int i = 0; i < count; i ++){
+            startWorker();
+        }
+    }
+
+
+    /**
+     * Stop the workers
+     */
+    public void stop() {
+        synchronized ( mutex ) {
+            //stop consuming
+
+            for(final Subscription subscription: subscriptions){
+                subscription.unsubscribe();
+            }
+        }
+    }
+
+
+    private void startWorker(){
         synchronized ( mutex) {
 
             final AtomicInteger countFail = new AtomicInteger();
@@ -104,7 +137,7 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
                 public void call( final Subscriber<? super List<IndexOperationMessage>> subscriber ) {
 
                     //name our thread so it's easy to see
-                    Thread.currentThread().setName( "QueueConsumer_" + Thread.currentThread().getId() );
+                    Thread.currentThread().setName( "QueueConsumer_" + counter.incrementAndGet() );
 
                     List<IndexOperationMessage> drainList;
                     do {
@@ -168,20 +201,14 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
                 } );
 
             //start in the background
-            subscription = consumer.subscribe();
-        }
-    }
 
+           final Subscription subscription = consumer.subscribe();
 
-    public void stop() {
-        synchronized ( mutex ) {
-            //stop consuming
-            if(subscription != null) {
-                subscription.unsubscribe();
-            }
+            subscriptions.add(subscription );
         }
     }
 
+
     /**
      * Execute the request, check for errors, then re-init the batch for future use
      */

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9aec790b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
index 7d7a18d..57c2fab 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
@@ -38,8 +38,8 @@ public class TestIndexModule extends TestModule {
         install( new IndexModule() {
             @Override
             public void wireBufferQueue() {
-//                bind( BufferQueue.class).to( BufferQueueInMemoryImpl.class );
-                bind( BufferQueue.class).to( BufferQueueSQSImpl.class );
+                bind( BufferQueue.class).to( BufferQueueInMemoryImpl.class );
+//                bind( BufferQueue.class).to( BufferQueueSQSImpl.class );
             }
         } );
     }