You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/03/17 00:38:01 UTC
[33/50] incubator-usergrid git commit: Added the ability to specify
the number of workers
Added the ability to specify the number of workers
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/9aec790b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/9aec790b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/9aec790b
Branch: refs/heads/two-dot-o
Commit: 9aec790b3fd8486ce6dba67125f94a5d912e8596
Parents: d81dfaa
Author: Todd Nine <tn...@apigee.com>
Authored: Wed Mar 11 18:27:16 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Wed Mar 11 18:27:16 2015 -0600
----------------------------------------------------------------------
.../usergrid/persistence/index/IndexFig.java | 9 ++++
.../index/impl/EsIndexBufferConsumerImpl.java | 55 +++++++++++++++-----
.../index/guice/TestIndexModule.java | 4 +-
3 files changed, 52 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9aec790b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
index cde86fd..445789f 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
@@ -81,6 +81,11 @@ public interface IndexFig extends GuicyFig {
*/
public static final String FAILURE_REJECTED_RETRY_WAIT_TIME = "elasticsearch.rejected_retry_wait";
+ /**
+ * The number of worker threads to consume from the queue
+ */
+ public static final String ELASTICSEARCH_WORKER_COUNT = "elasticsearch.worker_count";
+
public static final String QUERY_LIMIT_DEFAULT = "index.query.limit.default";
@Default( "127.0.0.1" )
@@ -181,4 +186,8 @@ public interface IndexFig extends GuicyFig {
@Key( INDEX_QUEUE_READ_TIMEOUT )
int getIndexQueueTimeout();
+ @Default("2")
+ @Key( ELASTICSEARCH_WORKER_COUNT )
+ int getWorkerCount();
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9aec790b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
index 862b1ae..836ec3d 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
@@ -49,9 +49,12 @@ import rx.functions.Func1;
import rx.functions.Func2;
import rx.schedulers.Schedulers;
+import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
/**
* Consumer for IndexOperationMessages
@@ -69,15 +72,18 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
private final Meter flushMeter;
private final Timer produceTimer;
private final BufferQueue bufferQueue;
+ private final IndexFig indexFig;
+ private final AtomicLong counter = new AtomicLong( );
//the actively running subscription
- private Subscription subscription;
+ private List<Subscription> subscriptions;
private Object mutex = new Object();
@Inject
- public EsIndexBufferConsumerImpl( final IndexFig config, final EsProvider
- provider, final MetricsFactory metricsFactory, final BufferQueue bufferQueue ){
+ public EsIndexBufferConsumerImpl( final IndexFig config, final EsProvider provider, final MetricsFactory
+ metricsFactory, final BufferQueue bufferQueue, final IndexFig indexFig ){
+
this.flushTimer = metricsFactory.getTimer(EsIndexBufferConsumerImpl.class, "index.buffer.flush");
this.flushMeter = metricsFactory.getMeter(EsIndexBufferConsumerImpl.class, "index.buffer.meter");
this.indexSizeCounter = metricsFactory.getCounter(EsIndexBufferConsumerImpl.class, "index.buffer.size");
@@ -86,15 +92,42 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
this.client = provider.getClient();
this.produceTimer = metricsFactory.getTimer(EsIndexBufferConsumerImpl.class,"index.buffer.consumer.messageFetch");
this.bufferQueue = bufferQueue;
+ this.indexFig = indexFig;
-
+ subscriptions = new ArrayList<>( indexFig.getWorkerCount() );
//batch up sets of some size and send them in batch
start();
}
+ /**
+ * Loop throught and start the workers
+ */
public void start() {
+ final int count = indexFig.getWorkerCount();
+
+ for(int i = 0; i < count; i ++){
+ startWorker();
+ }
+ }
+
+
+ /**
+ * Stop the workers
+ */
+ public void stop() {
+ synchronized ( mutex ) {
+ //stop consuming
+
+ for(final Subscription subscription: subscriptions){
+ subscription.unsubscribe();
+ }
+ }
+ }
+
+
+ private void startWorker(){
synchronized ( mutex) {
final AtomicInteger countFail = new AtomicInteger();
@@ -104,7 +137,7 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
public void call( final Subscriber<? super List<IndexOperationMessage>> subscriber ) {
//name our thread so it's easy to see
- Thread.currentThread().setName( "QueueConsumer_" + Thread.currentThread().getId() );
+ Thread.currentThread().setName( "QueueConsumer_" + counter.incrementAndGet() );
List<IndexOperationMessage> drainList;
do {
@@ -168,20 +201,14 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
} );
//start in the background
- subscription = consumer.subscribe();
- }
- }
+ final Subscription subscription = consumer.subscribe();
- public void stop() {
- synchronized ( mutex ) {
- //stop consuming
- if(subscription != null) {
- subscription.unsubscribe();
- }
+ subscriptions.add(subscription );
}
}
+
/**
* Execute the request, check for errors, then re-init the batch for future use
*/
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9aec790b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
index 7d7a18d..57c2fab 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
@@ -38,8 +38,8 @@ public class TestIndexModule extends TestModule {
install( new IndexModule() {
@Override
public void wireBufferQueue() {
-// bind( BufferQueue.class).to( BufferQueueInMemoryImpl.class );
- bind( BufferQueue.class).to( BufferQueueSQSImpl.class );
+ bind( BufferQueue.class).to( BufferQueueInMemoryImpl.class );
+// bind( BufferQueue.class).to( BufferQueueSQSImpl.class );
}
} );
}