You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/03/09 16:25:29 UTC
[16/31] incubator-usergrid git commit: changing buffer impl to queue
changing buffer impl to queue
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/5ad1a8c9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/5ad1a8c9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/5ad1a8c9
Branch: refs/heads/USERGRID-405
Commit: 5ad1a8c953fc3b8a6b48e6d0ca3c7c0da52ec8d1
Parents: 615a5af
Author: Shawn Feldman <sf...@apache.org>
Authored: Fri Mar 6 15:47:28 2015 -0700
Committer: Shawn Feldman <sf...@apache.org>
Committed: Fri Mar 6 15:47:28 2015 -0700
----------------------------------------------------------------------
.../persistence/index/IndexBufferProducer.java | 10 +++--
.../usergrid/persistence/index/IndexFig.java | 12 +++++-
.../index/impl/EsIndexBufferConsumerImpl.java | 41 +++++++++++++++++++-
.../index/impl/EsIndexBufferProducerImpl.java | 30 +++++++++-----
.../persistence/index/impl/EntityIndexTest.java | 2 +-
5 files changed, 77 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5ad1a8c9/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBufferProducer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBufferProducer.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBufferProducer.java
index 6338a0c..19d224c 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBufferProducer.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBufferProducer.java
@@ -24,13 +24,15 @@ import org.apache.usergrid.persistence.index.IndexOperationMessage;
import rx.Observable;
import rx.Subscriber;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+
/**
* Classy class class.
*/
-public interface IndexBufferProducer extends Observable.OnSubscribe<IndexOperationMessage> {
-
- @Override
- void call(Subscriber<? super IndexOperationMessage> subscriber);
+public interface IndexBufferProducer {
BetterFuture put(IndexOperationMessage message);
+
+ BlockingQueue<IndexOperationMessage> getSource();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5ad1a8c9/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
index 9893ca5..c6f08f6 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
@@ -51,6 +51,8 @@ public interface IndexFig extends GuicyFig {
public static final String INDEX_BUFFER_SIZE = "elasticsearch.buffer_size";
+ public static final String INDEX_QUEUE_SIZE = "elasticsearch.queue_size";
+
public static final String INDEX_BUFFER_TIMEOUT = "elasticsearch.buffer_timeout";
public static final String INDEX_BATCH_SIZE = "elasticsearch.batch_size";
@@ -127,7 +129,7 @@ public interface IndexFig extends GuicyFig {
*/
@Default("250")
@Key( INDEX_BUFFER_TIMEOUT )
- int getIndexBufferTimeout();
+ long getIndexBufferTimeout();
/**
* size of the buffer to build up before you send results
@@ -138,6 +140,14 @@ public interface IndexFig extends GuicyFig {
int getIndexBufferSize();
/**
+ * size of the buffer to build up before you send results
+ * @return
+ */
+ @Default("1000")
+ @Key( INDEX_QUEUE_SIZE )
+ int getIndexQueueSize();
+
+ /**
* Request batch size for ES
* @return
*/
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5ad1a8c9/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
index 52d1abb..625f4e7 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
@@ -42,12 +42,14 @@ import org.elasticsearch.client.Client;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
+import rx.Subscriber;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.schedulers.Schedulers;
+import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
/**
* Consumer for IndexOperationMessages
@@ -63,6 +65,7 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
private final Timer flushTimer;
private final Counter indexSizeCounter;
private final Meter flushMeter;
+ private final Timer produceTimer;
@Inject
public EsIndexBufferConsumerImpl(final IndexFig config, final IndexBufferProducer producer, final EsProvider provider, final MetricsFactory metricsFactory){
@@ -72,14 +75,48 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
this.config = config;
this.failureMonitor = new FailureMonitorImpl(config,provider);
this.client = provider.getClient();
+ this.produceTimer = metricsFactory.getTimer(EsIndexBufferConsumerImpl.class,"index.buffer.consumer.messageFetch");
+ final BlockingQueue<IndexOperationMessage> producerQueue = producer.getSource();
//batch up sets of some size and send them in batch
- this.consumer = Observable.create(producer)
+ this.consumer = Observable.create(new Observable.OnSubscribe<IndexOperationMessage>() {
+ @Override
+ public void call(final Subscriber<? super IndexOperationMessage> subscriber) {
+ Thread thread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ List<IndexOperationMessage> drainList = new ArrayList<>(config.getIndexBufferSize() + 1);
+ do {
+ try {
+ Timer.Context timer = produceTimer.time();
+ IndexOperationMessage polled = producerQueue.poll(config.getIndexBufferTimeout(), TimeUnit.MILLISECONDS);
+ if(polled!=null) {
+ drainList.add(polled);
+ producerQueue.drainTo(drainList, config.getIndexBufferSize());
+ System.out.println("Consumer Thread" + Thread.currentThread().getName());
+ for(IndexOperationMessage drained : drainList){
+ subscriber.onNext(drained);
+ }
+ drainList.clear();
+ }
+ timer.stop();
+
+ } catch (InterruptedException ie) {
+ log.error("failed to dequeue", ie);
+ }
+ } while (true);
+ }
+ });
+ thread.setName("EsEntityIndex_Consumer");
+ thread.start();
+ }
+ })
.subscribeOn(Schedulers.io())
.buffer(config.getIndexBufferTimeout(), TimeUnit.MILLISECONDS, config.getIndexBufferSize())
.doOnNext(new Action1<List<IndexOperationMessage>>() {
@Override
public void call(List<IndexOperationMessage> containerList) {
+ System.out.println("Buffered Consumer Thread" + Thread.currentThread().getName());
if (containerList.size() > 0) {
flushMeter.mark(containerList.size());
Timer.Context time = flushTimer.time();
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5ad1a8c9/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java
index 29f243b..d4d621f 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java
@@ -20,15 +20,20 @@
package org.apache.usergrid.persistence.index.impl;
import com.codahale.metrics.Counter;
+import com.codahale.metrics.Timer;
import com.google.common.base.Preconditions;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import org.apache.usergrid.persistence.core.future.BetterFuture;
import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
import org.apache.usergrid.persistence.index.IndexBufferProducer;
+import org.apache.usergrid.persistence.index.IndexFig;
import org.apache.usergrid.persistence.index.IndexOperationMessage;
import rx.Subscriber;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+
/**
* Producer for index operation messages
*/
@@ -36,22 +41,27 @@ import rx.Subscriber;
public class EsIndexBufferProducerImpl implements IndexBufferProducer {
private final Counter indexSizeCounter;
- private Subscriber<? super IndexOperationMessage> subscriber;
+ private final ArrayBlockingQueue<IndexOperationMessage> messages;
+ private final Timer timer;
@Inject
- public EsIndexBufferProducerImpl(MetricsFactory metricsFactory){
- this.indexSizeCounter = metricsFactory.getCounter(EsIndexBufferProducerImpl.class,"index.buffer.size");
-
- }
- @Override
- public void call(Subscriber<? super IndexOperationMessage> subscriber) {
- this.subscriber = subscriber;
+ public EsIndexBufferProducerImpl(MetricsFactory metricsFactory,IndexFig fig){
+ this.messages = new ArrayBlockingQueue<>(fig.getIndexQueueSize()*5);
+ this.indexSizeCounter = metricsFactory.getCounter(EsIndexBufferProducerImpl.class, "index.buffer.size");
+ this.timer = metricsFactory.getTimer(EsIndexBufferProducerImpl.class,"index.buffer.producer.timer");
}
public BetterFuture put(IndexOperationMessage message){
- Preconditions.checkNotNull(message,"Message cannot be null");
+ Preconditions.checkNotNull(message, "Message cannot be null");
indexSizeCounter.inc(message.getOperations().size());
- subscriber.onNext(message);
+ Timer.Context time = timer.time();
+ messages.offer(message);
+ time.stop();
return message.getFuture();
}
+
+ @Override
+ public BlockingQueue<IndexOperationMessage> getSource() {
+ return messages;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5ad1a8c9/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
index 22aadc0..70ae8c5 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
@@ -96,7 +96,7 @@ public class EntityIndexTest extends BaseIT {
long now = System.currentTimeMillis();
final int threads = 20;
- final int size = 20;
+ final int size = 30;
final EntityIndex entityIndex = eif.createEntityIndex( applicationScope );
final IndexScope indexScope = new IndexScopeImpl(appId, "things");
final String entityType = "thing";