You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/10/01 18:27:39 UTC
[04/36] usergrid git commit: remove batch consumer
remove batch consumer
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/76816007
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/76816007
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/76816007
Branch: refs/heads/master
Commit: 76816007f34317e54659aaad3ac5c1bf59e8580d
Parents: ce5a96f
Author: Shawn Feldman <sf...@apache.org>
Authored: Tue Sep 15 13:11:12 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Thu Sep 24 15:00:13 2015 -0600
----------------------------------------------------------------------
.../usergrid/persistence/index/IndexFig.java | 14 ----
.../index/impl/EsIndexBufferConsumerImpl.java | 87 +++++++++-----------
.../index/impl/IndexBufferConsumer.java | 2 +
.../index/impl/IndexRefreshCommandImpl.java | 1 +
4 files changed, 44 insertions(+), 60 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/76816007/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
index 5997029..db1ef3d 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
@@ -153,21 +153,7 @@ public interface IndexFig extends GuicyFig {
@Default( "2" )
int getIndexCacheMaxWorkers();
- /**
- * The maximum time to wait before the buffer flushes and sends index write requests to Elasticsearch.
- * This is used so the application doesn't wait forever for the buffer to reach its size before writing
- * data to Elasticsearch.
- */
- @Default( "250" )
- @Key( INDEX_BUFFER_TIMEOUT )
- long getIndexBufferTimeout();
- /**
- * The maximum buffer size to use before sending index write requests to Elasticsearch.
- */
- @Default( "1000" )
- @Key( INDEX_BUFFER_SIZE )
- int getIndexBufferSize();
/**
* The number of worker threads used for flushing batches of index write requests
http://git-wip-us.apache.org/repos/asf/usergrid/blob/76816007/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
index 14f2b6f..91fab2f 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
@@ -100,12 +100,12 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
public Observable<IndexOperationMessage> put( IndexOperationMessage message ) {
Preconditions.checkNotNull(message, "Message cannot be null");
- indexSizeCounter.inc( message.getDeIndexRequests().size() );
- indexSizeCounter.inc( message.getIndexRequests().size() );
+ indexSizeCounter.inc(message.getDeIndexRequests().size());
+ indexSizeCounter.inc(message.getIndexRequests().size());
Timer.Context time = offerTimer.time();
- bufferProducer.send( message );
+ bufferProducer.send(message);
time.stop();
- return message.observable();
+ return message.observable();
}
@@ -114,22 +114,20 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
*/
private void startSubscription() {
-
+ //buffer on our new thread with a timeout
final Observable<IndexOperationMessage> observable = Observable.create(bufferProducer);
- //buffer on our new thread with a timeout
- observable.buffer( indexFig.getIndexBufferTimeout(), TimeUnit.MILLISECONDS, indexFig.getIndexBufferSize(),
- Schedulers.io() ).flatMap( indexOpBuffer -> {
+ observable.subscribeOn(Schedulers.io()).flatMap(indexOpBuffer -> {
//hand off to processor in new observable thread so we can continue to buffer faster
- return Observable.just( indexOpBuffer ).flatMap(
- indexOpBufferObservable -> ObservableTimer.time( processBatch( indexOpBufferObservable ),flushTimer )
+ return Observable.just(indexOpBuffer).flatMap(
+ indexOpBufferObservable -> ObservableTimer.time(processBatch(indexOpBufferObservable), flushTimer)
)
//use the I/O scheduler for thread re-use and efficiency in context switching then use our concurrent
// flatmap count or higher throughput of batches once buffered
- .subscribeOn( Schedulers.io() );
- }, indexFig.getIndexFlushWorkerCount() )
+ .subscribeOn(Schedulers.io());
+ }, indexFig.getIndexFlushWorkerCount())
//start in the background
.subscribe();
}
@@ -137,63 +135,60 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
/**
* Process the buffer of batches
- * @param batches
+ * @param batch
* @return
*/
- private Observable<IndexOperationMessage> processBatch( final List<IndexOperationMessage> batches ) {
+ private Observable<IndexOperationMessage> processBatch( final IndexOperationMessage batch ) {
- final Observable<IndexOperationMessage> indexOps = Observable.from( batches );
-
//take our stream of batches, then stream then into individual ops for consumption on ES
- final Observable<BatchOperation> batchOps = indexOps.flatMap( batch -> {
- final Set<IndexOperation> indexOperationSet = batch.getIndexRequests();
- final Set<DeIndexOperation> deIndexOperationSet = batch.getDeIndexRequests();
- final int indexOperationSetSize = indexOperationSet.size();
- final int deIndexOperationSetSize = deIndexOperationSet.size();
+ final Set<IndexOperation> indexOperationSet = batch.getIndexRequests();
+ final Set<DeIndexOperation> deIndexOperationSet = batch.getDeIndexRequests();
+
+ final int indexOperationSetSize = indexOperationSet.size();
+ final int deIndexOperationSetSize = deIndexOperationSet.size();
- log.debug( "Emitting {} add and {} remove operations", indexOperationSetSize, deIndexOperationSetSize );
+ log.debug("Emitting {} add and {} remove operations", indexOperationSetSize, deIndexOperationSetSize);
- indexSizeCounter.dec( indexOperationSetSize );
- indexSizeCounter.dec( deIndexOperationSetSize );
+ indexSizeCounter.dec(indexOperationSetSize);
+ indexSizeCounter.dec(deIndexOperationSetSize);
- final Observable<IndexOperation> index = Observable.from( batch.getIndexRequests() );
- final Observable<DeIndexOperation> deIndex = Observable.from( batch.getDeIndexRequests() );
+ final Observable<IndexOperation> index = Observable.from(batch.getIndexRequests());
+ final Observable<DeIndexOperation> deIndex = Observable.from(batch.getDeIndexRequests());
- return Observable.merge( index, deIndex );
- } );
+ final Observable<BatchOperation> batchOps = Observable.merge(index, deIndex);
//buffer into the max size we can send ES and fire them all off until we're completed
- final Observable<BulkRequestBuilder> requests = batchOps.buffer( indexFig.getIndexBatchSize() )
+ final Observable<BulkRequestBuilder> requests = batchOps.buffer(indexFig.getIndexBatchSize())
//flatten the buffer into a single batch execution
- .flatMap( individualOps -> Observable.from( individualOps )
+ .flatMap(individualOps -> Observable.from(individualOps)
//collect them
- .collect( () -> initRequest(), ( bulkRequestBuilder, batchOperation ) -> {
- log.debug( "adding operation {} to bulkRequestBuilder {}", batchOperation, bulkRequestBuilder );
- batchOperation.doOperation( client, bulkRequestBuilder );
- } ) )
+ .collect(() -> initRequest(), (bulkRequestBuilder, batchOperation) -> {
+ log.debug("adding operation {} to bulkRequestBuilder {}", batchOperation, bulkRequestBuilder);
+ batchOperation.doOperation(client, bulkRequestBuilder);
+ }))
//write them
- .doOnNext( bulkRequestBuilder -> sendRequest( bulkRequestBuilder ) );
+ .doOnNext(bulkRequestBuilder -> sendRequest(bulkRequestBuilder));
//now that we've processed them all, ack the futures after our last batch comes through
final Observable<IndexOperationMessage> processedIndexOperations =
- requests.lastOrDefault(null).flatMap( lastRequest ->{
- if(lastRequest!=null){
- return Observable.from( batches ) ;
- }else{
+ requests.lastOrDefault(null).flatMap(lastRequest -> {
+ if (lastRequest != null) {
+ return Observable.just(batch);
+ } else {
return Observable.empty();
}
});
//subscribe to the operations that generate requests on a new thread so that we can execute them quickly
//mark this as done
- return processedIndexOperations.doOnNext( processedIndexOp -> {
- processedIndexOp.done();
- roundtripTimer.update( System.currentTimeMillis() - processedIndexOp.getCreationTime() );
- } );
+ return processedIndexOperations.doOnNext(processedIndexOp -> {
+ processedIndexOp.done();
+ roundtripTimer.update(System.currentTimeMillis() - processedIndexOp.getCreationTime());
+ });
}
@@ -266,12 +261,12 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
/**
* Send the data through the buffer
*/
- public void send( final IndexOperationMessage indexOp ) {
+ public void send( final IndexOperationMessage indexOps ) {
try {
- subscriber.onNext( indexOp );
+ subscriber.onNext( indexOps );
}catch(Exception e){
//re-throws so the caller can determine failover
- log.error( "Unable to process message for indexOp {}, error follows.", indexOp, e );
+ log.error( "Unable to process message for indexOp {}, error follows.", indexOps, e );
throw e;
}
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/76816007/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferConsumer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferConsumer.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferConsumer.java
index e769455..df2119c 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferConsumer.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferConsumer.java
@@ -20,6 +20,8 @@ package org.apache.usergrid.persistence.index.impl;
import rx.Observable;
+import java.util.List;
+
/**
* Buffer index requests
http://git-wip-us.apache.org/repos/asf/usergrid/blob/76816007/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
index 70220d0..7b9bc5d 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
@@ -20,6 +20,7 @@
package org.apache.usergrid.persistence.index.impl;
+import java.util.Collections;
import java.util.Map;
import java.util.UUID;