You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/10/01 18:27:39 UTC

[04/36] usergrid git commit: remove batch consumer

remove batch consumer


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/76816007
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/76816007
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/76816007

Branch: refs/heads/master
Commit: 76816007f34317e54659aaad3ac5c1bf59e8580d
Parents: ce5a96f
Author: Shawn Feldman <sf...@apache.org>
Authored: Tue Sep 15 13:11:12 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Thu Sep 24 15:00:13 2015 -0600

----------------------------------------------------------------------
 .../usergrid/persistence/index/IndexFig.java    | 14 ----
 .../index/impl/EsIndexBufferConsumerImpl.java   | 87 +++++++++-----------
 .../index/impl/IndexBufferConsumer.java         |  2 +
 .../index/impl/IndexRefreshCommandImpl.java     |  1 +
 4 files changed, 44 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/76816007/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
index 5997029..db1ef3d 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
@@ -153,21 +153,7 @@ public interface IndexFig extends GuicyFig {
     @Default( "2" )
     int getIndexCacheMaxWorkers();
 
-    /**
-     * The maximum time to wait before the buffer flushes and sends index write requests to Elasticsearch.
-     * This is used so the application doesn't wait forever for the buffer to reach its size before writing
-     * data to Elasticsearch.
-     */
-    @Default( "250" )
-    @Key( INDEX_BUFFER_TIMEOUT )
-    long getIndexBufferTimeout();
 
-    /**
-     * The maximum buffer size to use before sending index write requests to Elasticsearch.
-     */
-    @Default( "1000" )
-    @Key( INDEX_BUFFER_SIZE )
-    int getIndexBufferSize();
 
     /**
      * The number of worker threads used for flushing batches of index write requests

http://git-wip-us.apache.org/repos/asf/usergrid/blob/76816007/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
index 14f2b6f..91fab2f 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
@@ -100,12 +100,12 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
 
     public Observable<IndexOperationMessage>  put( IndexOperationMessage message ) {
         Preconditions.checkNotNull(message, "Message cannot be null");
-        indexSizeCounter.inc( message.getDeIndexRequests().size() );
-        indexSizeCounter.inc( message.getIndexRequests().size() );
+        indexSizeCounter.inc(message.getDeIndexRequests().size());
+        indexSizeCounter.inc(message.getIndexRequests().size());
         Timer.Context time = offerTimer.time();
-        bufferProducer.send( message );
+        bufferProducer.send(message);
         time.stop();
-        return message.observable();
+        return  message.observable();
     }
 
 
@@ -114,22 +114,20 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
      */
     private void startSubscription() {
 
-
+        //buffer on our new thread with a timeout
         final Observable<IndexOperationMessage> observable = Observable.create(bufferProducer);
 
-        //buffer on our new thread with a timeout
-        observable.buffer( indexFig.getIndexBufferTimeout(), TimeUnit.MILLISECONDS, indexFig.getIndexBufferSize(),
-            Schedulers.io() ).flatMap( indexOpBuffer -> {
+        observable.subscribeOn(Schedulers.io()).flatMap(indexOpBuffer -> {
 
             //hand off to processor in new observable thread so we can continue to buffer faster
-            return Observable.just( indexOpBuffer ).flatMap(
-                indexOpBufferObservable -> ObservableTimer.time( processBatch( indexOpBufferObservable ),flushTimer )
+            return Observable.just(indexOpBuffer).flatMap(
+                indexOpBufferObservable -> ObservableTimer.time(processBatch(indexOpBufferObservable), flushTimer)
             )
 
                 //use the I/O scheduler for thread re-use and efficiency in context switching then use our concurrent
                 // flatmap count or higher throughput of batches once buffered
-                .subscribeOn( Schedulers.io() );
-        }, indexFig.getIndexFlushWorkerCount() )
+                .subscribeOn(Schedulers.io());
+        }, indexFig.getIndexFlushWorkerCount())
             //start in the background
             .subscribe();
     }
@@ -137,63 +135,60 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
 
     /**
      * Process the buffer of batches
-     * @param batches
+     * @param batch
      * @return
      */
-    private Observable<IndexOperationMessage> processBatch( final List<IndexOperationMessage> batches ) {
+    private Observable<IndexOperationMessage> processBatch( final IndexOperationMessage batch ) {
 
 
-        final Observable<IndexOperationMessage> indexOps = Observable.from( batches );
-
         //take our stream of batches, then stream then into individual ops for consumption on ES
-        final Observable<BatchOperation> batchOps = indexOps.flatMap( batch -> {
 
-            final Set<IndexOperation> indexOperationSet = batch.getIndexRequests();
-            final Set<DeIndexOperation> deIndexOperationSet = batch.getDeIndexRequests();
 
-            final int indexOperationSetSize = indexOperationSet.size();
-            final int deIndexOperationSetSize = deIndexOperationSet.size();
+        final Set<IndexOperation> indexOperationSet = batch.getIndexRequests();
+        final Set<DeIndexOperation> deIndexOperationSet = batch.getDeIndexRequests();
+
+        final int indexOperationSetSize = indexOperationSet.size();
+        final int deIndexOperationSetSize = deIndexOperationSet.size();
 
-            log.debug( "Emitting {} add and {} remove operations", indexOperationSetSize, deIndexOperationSetSize );
+        log.debug("Emitting {} add and {} remove operations", indexOperationSetSize, deIndexOperationSetSize);
 
-            indexSizeCounter.dec( indexOperationSetSize );
-            indexSizeCounter.dec( deIndexOperationSetSize );
+        indexSizeCounter.dec(indexOperationSetSize);
+        indexSizeCounter.dec(deIndexOperationSetSize);
 
-            final Observable<IndexOperation> index = Observable.from( batch.getIndexRequests() );
-            final Observable<DeIndexOperation> deIndex = Observable.from( batch.getDeIndexRequests() );
+        final Observable<IndexOperation> index = Observable.from(batch.getIndexRequests());
+        final Observable<DeIndexOperation> deIndex = Observable.from(batch.getDeIndexRequests());
 
-            return Observable.merge( index, deIndex );
-        } );
+        final Observable<BatchOperation> batchOps = Observable.merge(index, deIndex);
 
         //buffer into the max size we can send ES and fire them all off until we're completed
-        final Observable<BulkRequestBuilder> requests = batchOps.buffer( indexFig.getIndexBatchSize() )
+        final Observable<BulkRequestBuilder> requests = batchOps.buffer(indexFig.getIndexBatchSize())
             //flatten the buffer into a single batch execution
-            .flatMap( individualOps -> Observable.from( individualOps )
+            .flatMap(individualOps -> Observable.from(individualOps)
                 //collect them
-                .collect( () -> initRequest(), ( bulkRequestBuilder, batchOperation ) -> {
-                    log.debug( "adding operation {} to bulkRequestBuilder {}", batchOperation, bulkRequestBuilder );
-                    batchOperation.doOperation( client, bulkRequestBuilder );
-                } ) )
+                .collect(() -> initRequest(), (bulkRequestBuilder, batchOperation) -> {
+                    log.debug("adding operation {} to bulkRequestBuilder {}", batchOperation, bulkRequestBuilder);
+                    batchOperation.doOperation(client, bulkRequestBuilder);
+                }))
                 //write them
-            .doOnNext( bulkRequestBuilder -> sendRequest( bulkRequestBuilder ) );
+            .doOnNext(bulkRequestBuilder -> sendRequest(bulkRequestBuilder));
 
 
         //now that we've processed them all, ack the futures after our last batch comes through
         final Observable<IndexOperationMessage> processedIndexOperations =
-            requests.lastOrDefault(null).flatMap( lastRequest ->{
-                if(lastRequest!=null){
-                    return Observable.from( batches ) ;
-                }else{
+            requests.lastOrDefault(null).flatMap(lastRequest -> {
+                if (lastRequest != null) {
+                    return Observable.just(batch);
+                } else {
                     return Observable.empty();
                 }
             });
 
         //subscribe to the operations that generate requests on a new thread so that we can execute them quickly
         //mark this as done
-        return processedIndexOperations.doOnNext( processedIndexOp -> {
-                processedIndexOp.done();
-                roundtripTimer.update( System.currentTimeMillis() - processedIndexOp.getCreationTime() );
-            } );
+        return processedIndexOperations.doOnNext(processedIndexOp -> {
+            processedIndexOp.done();
+            roundtripTimer.update(System.currentTimeMillis() - processedIndexOp.getCreationTime());
+        });
     }
 
 
@@ -266,12 +261,12 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
         /**
          * Send the data through the buffer
          */
-        public void send( final IndexOperationMessage indexOp ) {
+        public void send( final IndexOperationMessage indexOps ) {
             try {
-                subscriber.onNext( indexOp );
+                subscriber.onNext( indexOps );
             }catch(Exception e){
                 //re-throws so the caller can determine failover
-                log.error( "Unable to process message for indexOp {}, error follows.", indexOp, e );
+                log.error( "Unable to process message for indexOp {}, error follows.", indexOps, e );
                 throw e;
             }
         }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/76816007/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferConsumer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferConsumer.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferConsumer.java
index e769455..df2119c 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferConsumer.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferConsumer.java
@@ -20,6 +20,8 @@ package org.apache.usergrid.persistence.index.impl;
 
 import rx.Observable;
 
+import java.util.List;
+
 
 /**
  *  Buffer index requests

http://git-wip-us.apache.org/repos/asf/usergrid/blob/76816007/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
index 70220d0..7b9bc5d 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
@@ -20,6 +20,7 @@
 package org.apache.usergrid.persistence.index.impl;
 
 
+import java.util.Collections;
 import java.util.Map;
 import java.util.UUID;