You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/03/09 16:14:33 UTC

[16/30] incubator-usergrid git commit: changing buffer impl to queue

changing buffer impl to queue


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/5ad1a8c9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/5ad1a8c9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/5ad1a8c9

Branch: refs/heads/two-dot-o
Commit: 5ad1a8c953fc3b8a6b48e6d0ca3c7c0da52ec8d1
Parents: 615a5af
Author: Shawn Feldman <sf...@apache.org>
Authored: Fri Mar 6 15:47:28 2015 -0700
Committer: Shawn Feldman <sf...@apache.org>
Committed: Fri Mar 6 15:47:28 2015 -0700

----------------------------------------------------------------------
 .../persistence/index/IndexBufferProducer.java  | 10 +++--
 .../usergrid/persistence/index/IndexFig.java    | 12 +++++-
 .../index/impl/EsIndexBufferConsumerImpl.java   | 41 +++++++++++++++++++-
 .../index/impl/EsIndexBufferProducerImpl.java   | 30 +++++++++-----
 .../persistence/index/impl/EntityIndexTest.java |  2 +-
 5 files changed, 77 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5ad1a8c9/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBufferProducer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBufferProducer.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBufferProducer.java
index 6338a0c..19d224c 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBufferProducer.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBufferProducer.java
@@ -24,13 +24,15 @@ import org.apache.usergrid.persistence.index.IndexOperationMessage;
 import rx.Observable;
 import rx.Subscriber;
 
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+
 /**
  * Classy class class.
  */
-public interface IndexBufferProducer extends Observable.OnSubscribe<IndexOperationMessage> {
-
-    @Override
-    void call(Subscriber<? super IndexOperationMessage> subscriber);
+public interface IndexBufferProducer {
 
     BetterFuture put(IndexOperationMessage message);
+
+    BlockingQueue<IndexOperationMessage> getSource();
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5ad1a8c9/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
index 9893ca5..c6f08f6 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
@@ -51,6 +51,8 @@ public interface IndexFig extends GuicyFig {
 
     public static final String INDEX_BUFFER_SIZE = "elasticsearch.buffer_size";
 
+    public static final String INDEX_QUEUE_SIZE = "elasticsearch.queue_size";
+
     public static final String INDEX_BUFFER_TIMEOUT = "elasticsearch.buffer_timeout";
 
     public static final String INDEX_BATCH_SIZE = "elasticsearch.batch_size";
@@ -127,7 +129,7 @@ public interface IndexFig extends GuicyFig {
      */
     @Default("250")
     @Key( INDEX_BUFFER_TIMEOUT )
-    int getIndexBufferTimeout();
+    long getIndexBufferTimeout();
 
     /**
      * size of the buffer to build up before you send results
@@ -138,6 +140,14 @@ public interface IndexFig extends GuicyFig {
     int getIndexBufferSize();
 
     /**
+     * size of the buffer to build up before you send results
+     * @return
+     */
+    @Default("1000")
+    @Key( INDEX_QUEUE_SIZE )
+    int getIndexQueueSize();
+
+    /**
      * Request batch size for ES
      * @return
      */

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5ad1a8c9/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
index 52d1abb..625f4e7 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
@@ -42,12 +42,14 @@ import org.elasticsearch.client.Client;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import rx.Observable;
+import rx.Subscriber;
 import rx.functions.Action1;
 import rx.functions.Func1;
 import rx.schedulers.Schedulers;
 
+import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
 
 /**
  * Consumer for IndexOperationMessages
@@ -63,6 +65,7 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
     private final Timer flushTimer;
     private final Counter indexSizeCounter;
     private final Meter flushMeter;
+    private final Timer produceTimer;
 
     @Inject
     public EsIndexBufferConsumerImpl(final IndexFig config, final IndexBufferProducer producer, final EsProvider provider, final MetricsFactory metricsFactory){
@@ -72,14 +75,48 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
         this.config = config;
         this.failureMonitor = new FailureMonitorImpl(config,provider);
         this.client = provider.getClient();
+        this.produceTimer = metricsFactory.getTimer(EsIndexBufferConsumerImpl.class,"index.buffer.consumer.messageFetch");
+        final BlockingQueue<IndexOperationMessage> producerQueue = producer.getSource();
 
         //batch up sets of some size and send them in batch
-        this.consumer = Observable.create(producer)
+        this.consumer = Observable.create(new Observable.OnSubscribe<IndexOperationMessage>() {
+            @Override
+            public void call(final Subscriber<? super IndexOperationMessage> subscriber) {
+                Thread thread = new Thread(new Runnable() {
+                    @Override
+                    public void run() {
+                        List<IndexOperationMessage> drainList = new ArrayList<>(config.getIndexBufferSize() + 1);
+                        do {
+                            try {
+                                Timer.Context timer = produceTimer.time();
+                                IndexOperationMessage polled = producerQueue.poll(config.getIndexBufferTimeout(), TimeUnit.MILLISECONDS);
+                                if(polled!=null) {
+                                    drainList.add(polled);
+                                    producerQueue.drainTo(drainList, config.getIndexBufferSize());
+                                    System.out.println("Consumer Thread" + Thread.currentThread().getName());
+                                    for(IndexOperationMessage drained : drainList){
+                                        subscriber.onNext(drained);
+                                    }
+                                    drainList.clear();
+                                }
+                                timer.stop();
+
+                            } catch (InterruptedException ie) {
+                                log.error("failed to dequeue", ie);
+                            }
+                        } while (true);
+                    }
+                });
+                thread.setName("EsEntityIndex_Consumer");
+                thread.start();
+            }
+        })
             .subscribeOn(Schedulers.io())
             .buffer(config.getIndexBufferTimeout(), TimeUnit.MILLISECONDS, config.getIndexBufferSize())
             .doOnNext(new Action1<List<IndexOperationMessage>>() {
                 @Override
                 public void call(List<IndexOperationMessage> containerList) {
+                    System.out.println("Buffered Consumer Thread" + Thread.currentThread().getName());
                     if (containerList.size() > 0) {
                         flushMeter.mark(containerList.size());
                         Timer.Context time = flushTimer.time();

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5ad1a8c9/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java
index 29f243b..d4d621f 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java
@@ -20,15 +20,20 @@
 package org.apache.usergrid.persistence.index.impl;
 
 import com.codahale.metrics.Counter;
+import com.codahale.metrics.Timer;
 import com.google.common.base.Preconditions;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import org.apache.usergrid.persistence.core.future.BetterFuture;
 import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.index.IndexBufferProducer;
+import org.apache.usergrid.persistence.index.IndexFig;
 import org.apache.usergrid.persistence.index.IndexOperationMessage;
 import rx.Subscriber;
 
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+
 /**
  * Producer for index operation messages
  */
@@ -36,22 +41,27 @@ import rx.Subscriber;
 public class EsIndexBufferProducerImpl implements IndexBufferProducer {
 
     private final Counter indexSizeCounter;
-    private Subscriber<? super IndexOperationMessage> subscriber;
+    private final ArrayBlockingQueue<IndexOperationMessage> messages;
+    private final Timer timer;
 
     @Inject
-    public EsIndexBufferProducerImpl(MetricsFactory metricsFactory){
-        this.indexSizeCounter = metricsFactory.getCounter(EsIndexBufferProducerImpl.class,"index.buffer.size");
-
-    }
-    @Override
-    public void call(Subscriber<? super IndexOperationMessage> subscriber) {
-        this.subscriber = subscriber;
+    public EsIndexBufferProducerImpl(MetricsFactory metricsFactory,IndexFig fig){
+        this.messages = new ArrayBlockingQueue<>(fig.getIndexQueueSize()*5);
+        this.indexSizeCounter = metricsFactory.getCounter(EsIndexBufferProducerImpl.class, "index.buffer.size");
+        this.timer =  metricsFactory.getTimer(EsIndexBufferProducerImpl.class,"index.buffer.producer.timer");
     }
 
     public BetterFuture put(IndexOperationMessage message){
-        Preconditions.checkNotNull(message,"Message cannot be null");
+        Preconditions.checkNotNull(message, "Message cannot be null");
         indexSizeCounter.inc(message.getOperations().size());
-        subscriber.onNext(message);
+        Timer.Context time = timer.time();
+        messages.offer(message);
+        time.stop();
         return message.getFuture();
     }
+
+    @Override
+    public BlockingQueue<IndexOperationMessage> getSource() {
+        return messages;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5ad1a8c9/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
index 22aadc0..70ae8c5 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
@@ -96,7 +96,7 @@ public class EntityIndexTest extends BaseIT {
 
         long now = System.currentTimeMillis();
         final int threads = 20;
-        final int size = 20;
+        final int size = 30;
         final EntityIndex entityIndex = eif.createEntityIndex( applicationScope );
         final IndexScope indexScope = new IndexScopeImpl(appId, "things");
         final String entityType = "thing";