You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/10/01 18:27:44 UTC

[09/36] usergrid git commit: change consumer to producer

change consumer to producer


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/5218cda9
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/5218cda9
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/5218cda9

Branch: refs/heads/master
Commit: 5218cda9d5732ae4ee2ee2ce1e7ae297975a7fad
Parents: a5ece51
Author: Shawn Feldman <sf...@apache.org>
Authored: Tue Sep 15 16:17:13 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Thu Sep 24 15:01:20 2015 -0600

----------------------------------------------------------------------
 .../persistence/index/guice/IndexModule.java    |   2 +-
 .../index/impl/EsEntityIndexBatchImpl.java      |   4 +-
 .../index/impl/EsEntityIndexFactoryImpl.java    |   8 +-
 .../index/impl/EsEntityIndexImpl.java           |   6 +-
 .../index/impl/EsIndexBufferConsumerImpl.java   | 216 -------------------
 .../index/impl/EsIndexProducerImpl.java         | 209 ++++++++++++++++++
 .../index/impl/IndexBufferConsumer.java         |  40 ----
 .../persistence/index/impl/IndexProducer.java   |  37 ++++
 .../index/impl/IndexRefreshCommandImpl.java     |   5 +-
 9 files changed, 257 insertions(+), 270 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/5218cda9/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
index 7279174..46559ad 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
@@ -52,7 +52,7 @@ public abstract class IndexModule extends AbstractModule {
         bind(IndexCache.class).to(EsIndexCacheImpl.class);
         bind(IndexRefreshCommand.class).to(IndexRefreshCommandImpl.class);
 
-        bind(IndexBufferConsumer.class).to(EsIndexBufferConsumerImpl.class).asEagerSingleton();
+        bind(IndexProducer.class).to(EsIndexProducerImpl.class).asEagerSingleton();
 
 
         //wire up the edg migration. A no-op ATM, but retained for future development

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5218cda9/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
index c11feed..64a1c6a 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
@@ -41,7 +41,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
     private final IndexAlias alias;
 
     private final IndexLocationStrategy indexLocationStrategy;
-    private final IndexBufferConsumer indexBatchBufferProducer;
+    private final IndexProducer indexBatchBufferProducer;
 
     private final EntityIndex entityIndex;
     private final ApplicationScope applicationScope;
@@ -49,7 +49,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
 
 
     public EsEntityIndexBatchImpl( final IndexLocationStrategy locationStrategy,
-                                   final IndexBufferConsumer indexBatchBufferProducer,
+                                   final IndexProducer indexBatchBufferProducer,
                                    final EntityIndex entityIndex
     ) {
         this.indexLocationStrategy = locationStrategy;

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5218cda9/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexFactoryImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexFactoryImpl.java
index 869d079..b66fd40 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexFactoryImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexFactoryImpl.java
@@ -37,7 +37,7 @@ public class EsEntityIndexFactoryImpl implements EntityIndexFactory{
     private final IndexFig config;
     private final IndexCache indexCache;
     private final EsProvider provider;
-    private final IndexBufferConsumer indexBufferConsumer;
+    private final IndexProducer indexProducer;
     private final MetricsFactory metricsFactory;
     private final IndexRefreshCommand refreshCommand;
 
@@ -50,7 +50,7 @@ public class EsEntityIndexFactoryImpl implements EntityIndexFactory{
                     config,
                     refreshCommand,
                     metricsFactory,
-                    indexBufferConsumer,
+                    indexProducer,
                     locationStrategy
                 );
                 index.initialize();
@@ -62,7 +62,7 @@ public class EsEntityIndexFactoryImpl implements EntityIndexFactory{
     public EsEntityIndexFactoryImpl( final IndexFig indexFig,
                                      final IndexCache indexCache,
                                      final EsProvider provider,
-                                     final IndexBufferConsumer indexBufferConsumer,
+                                     final IndexProducer indexProducer,
                                      final MetricsFactory metricsFactory,
                                      final IndexRefreshCommand refreshCommand
 
@@ -70,7 +70,7 @@ public class EsEntityIndexFactoryImpl implements EntityIndexFactory{
         this.config = indexFig;
         this.indexCache = indexCache;
         this.provider = provider;
-        this.indexBufferConsumer = indexBufferConsumer;
+        this.indexProducer = indexProducer;
         this.metricsFactory = metricsFactory;
         this.refreshCommand = refreshCommand;
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5218cda9/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index 1c63a7b..6317a69 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@ -63,8 +63,6 @@ import org.elasticsearch.index.query.*;
 import org.elasticsearch.indices.IndexAlreadyExistsException;
 import org.elasticsearch.search.SearchHit;
 import org.elasticsearch.search.SearchHits;
-import org.elasticsearch.search.aggregations.Aggregation;
-import org.elasticsearch.search.aggregations.AggregationBuilder;
 import org.elasticsearch.search.aggregations.metrics.sum.Sum;
 import org.elasticsearch.search.aggregations.metrics.sum.SumBuilder;
 import org.slf4j.Logger;
@@ -118,7 +116,7 @@ public class EsEntityIndexImpl implements EntityIndex,VersionedData {
     private final SearchRequestBuilderStrategyV2 searchRequestBuilderStrategyV2;
     private final int cursorTimeout;
     private final long queryTimeout;
-    private final IndexBufferConsumer indexBatchBufferProducer;
+    private final IndexProducer indexBatchBufferProducer;
     private final FailureMonitorImpl failureMonitor;
     private final Timer aggregationTimer;
 
@@ -133,7 +131,7 @@ public class EsEntityIndexImpl implements EntityIndex,VersionedData {
                               final IndexFig indexFig,
                               final IndexRefreshCommand indexRefreshCommand,
                               final MetricsFactory metricsFactory,
-                              final IndexBufferConsumer indexBatchBufferProducer,
+                              final IndexProducer indexBatchBufferProducer,
                               final IndexLocationStrategy indexLocationStrategy
     ) {
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5218cda9/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
deleted file mode 100644
index d126b5d..0000000
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *  contributor license agreements.  The ASF licenses this file to You
- * under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.  For additional information regarding
- * copyright in this work, please see the NOTICE file in the top level
- * directory of this distribution.
- */
-package org.apache.usergrid.persistence.index.impl;
-
-
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import com.codahale.metrics.Histogram;
-import org.apache.usergrid.persistence.core.metrics.ObservableTimer;
-import org.apache.usergrid.persistence.index.EntityIndexBatch;
-import org.elasticsearch.action.WriteConsistencyLevel;
-import org.elasticsearch.action.bulk.BulkItemResponse;
-import org.elasticsearch.action.bulk.BulkRequestBuilder;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.client.Client;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
-import org.apache.usergrid.persistence.index.IndexFig;
-
-import com.codahale.metrics.Counter;
-import com.codahale.metrics.Meter;
-import com.codahale.metrics.Timer;
-import com.google.common.base.Preconditions;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-
-import rx.Observable;
-import rx.Subscriber;
-import rx.schedulers.Schedulers;
-
-
-/**
- * Consumer for IndexOperationMessages
- */
-@Singleton
-public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
-    private static final Logger log = LoggerFactory.getLogger( EsIndexBufferConsumerImpl.class );
-
-    private final IndexFig config;
-    private final FailureMonitorImpl failureMonitor;
-    private final Client client;
-    private final Timer flushTimer;
-    private final IndexFig indexFig;
-    private final Counter indexSizeCounter;
-    private final Histogram roundtripTimer;
-    private final Timer indexTimer;
-
-
-    private AtomicLong inFlight = new AtomicLong();
-
-
-    @Inject
-    public EsIndexBufferConsumerImpl( final IndexFig config, final EsProvider provider,
-                                      final MetricsFactory metricsFactory, final IndexFig indexFig ) {
-        this.flushTimer = metricsFactory.getTimer(EsIndexBufferConsumerImpl.class, "index_buffer.flush");
-        this.indexSizeCounter = metricsFactory.getCounter(EsIndexBufferConsumerImpl.class, "index_buffer.size");
-        this.roundtripTimer = metricsFactory.getHistogram(EsIndexBufferConsumerImpl.class, "index_buffer.message_cycle");
-
-        //wire up the gauge of inflight messages
-        metricsFactory.addGauge(EsIndexBufferConsumerImpl.class, "index_buffer.inflight", () -> inFlight.longValue());
-
-
-        this.indexTimer = metricsFactory.getTimer( EsIndexBufferConsumerImpl.class, "index" );
-
-        this.config = config;
-        this.failureMonitor = new FailureMonitorImpl(config, provider);
-        this.client = provider.getClient();
-        this.indexFig = indexFig;
-
-
-        //batch up sets of some size and send them in batch
-
-    }
-
-    public Observable<IndexOperationMessage>  put( IndexOperationMessage message ) {
-        Preconditions.checkNotNull(message, "Message cannot be null");
-        indexSizeCounter.inc(message.getDeIndexRequests().size());
-        indexSizeCounter.inc(message.getIndexRequests().size());
-        return  processBatch(message);
-    }
-
-
-    /**
-     * Process the buffer of batches
-     * @param batch
-     * @return
-     */
-    private Observable<IndexOperationMessage> processBatch( final IndexOperationMessage batch ) {
-
-        //take our stream of batches, then stream then into individual ops for consumption on ES
-        final Set<IndexOperation> indexOperationSet = batch.getIndexRequests();
-        final Set<DeIndexOperation> deIndexOperationSet = batch.getDeIndexRequests();
-
-        final int indexOperationSetSize = indexOperationSet.size();
-        final int deIndexOperationSetSize = deIndexOperationSet.size();
-
-        log.debug("Emitting {} add and {} remove operations", indexOperationSetSize, deIndexOperationSetSize);
-
-        indexSizeCounter.dec(indexOperationSetSize);
-        indexSizeCounter.dec(deIndexOperationSetSize);
-
-        final Observable<IndexOperation> index = Observable.from(batch.getIndexRequests());
-        final Observable<DeIndexOperation> deIndex = Observable.from(batch.getDeIndexRequests());
-
-        final Observable<BatchOperation> batchOps = Observable.merge(index, deIndex);
-
-        //buffer into the max size we can send ES and fire them all off until we're completed
-        final Observable<BulkRequestBuilder> requests = batchOps.buffer(indexFig.getIndexBatchSize())
-            //flatten the buffer into a single batch execution
-            .flatMap(individualOps -> Observable.from(individualOps)
-                //collect them
-                .collect(() -> initRequest(), (bulkRequestBuilder, batchOperation) -> {
-                    log.debug("adding operation {} to bulkRequestBuilder {}", batchOperation, bulkRequestBuilder);
-                    batchOperation.doOperation(client, bulkRequestBuilder);
-                }))
-                //write them
-            .doOnNext(bulkRequestBuilder -> sendRequest(bulkRequestBuilder));
-
-
-        //now that we've processed them all, ack the futures after our last batch comes through
-        final Observable<IndexOperationMessage> processedIndexOperations =
-            requests.lastOrDefault(null).flatMap(lastRequest -> {
-                if (lastRequest != null) {
-                    return Observable.just(batch);
-                } else {
-                    return Observable.empty();
-                }
-            });
-
-        //subscribe to the operations that generate requests on a new thread so that we can execute them quickly
-        //mark this as done
-        return processedIndexOperations.doOnNext(processedIndexOp -> {
-            roundtripTimer.update(System.currentTimeMillis() - processedIndexOp.getCreationTime());
-        });
-    }
-
-
-    /*
-
-    /**
-     * initialize request
-     */
-    private BulkRequestBuilder initRequest() {
-        BulkRequestBuilder bulkRequest = client.prepareBulk();
-        bulkRequest.setConsistencyLevel( WriteConsistencyLevel.fromString( config.getWriteConsistencyLevel() ) );
-        bulkRequest.setRefresh( config.isForcedRefresh() );
-        return bulkRequest;
-    }
-
-
-    /**
-     * send bulk request
-     */
-    private void sendRequest( BulkRequestBuilder bulkRequest ) {
-        //nothing to do, we haven't added anything to the index
-        if ( bulkRequest.numberOfActions() == 0 ) {
-            return;
-        }
-
-        final BulkResponse responses;
-
-
-        final Timer.Context timer = indexTimer.time();
-
-        try {
-            responses = bulkRequest.execute().actionGet( );
-        } catch ( Throwable t ) {
-            log.error( "Unable to communicate with elasticsearch" );
-            failureMonitor.fail( "Unable to execute batch", t );
-            throw t;
-        }finally{
-            timer.stop();
-        }
-
-        failureMonitor.success();
-
-        boolean error = false;
-
-        for ( BulkItemResponse response : responses ) {
-
-            if ( response.isFailed() ) {
-                // log error and continue processing
-                log.error( "Unable to index id={}, type={}, index={}, failureMessage={} ", response.getId(),
-                    response.getType(), response.getIndex(), response.getFailureMessage() );
-
-                error = true;
-            }
-        }
-
-        if ( error ) {
-            throw new RuntimeException(
-                "Error during processing of bulk index operations one of the responses failed.  Check previous log "
-                    + "entries" );
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5218cda9/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java
new file mode 100644
index 0000000..a2c8663
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.  For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+package org.apache.usergrid.persistence.index.impl;
+
+
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.codahale.metrics.Histogram;
+import org.elasticsearch.action.WriteConsistencyLevel;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.client.Client;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.index.IndexFig;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Timer;
+import com.google.common.base.Preconditions;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+import rx.Observable;
+
+
+/**
+ * Consumer for IndexOperationMessages
+ */
+@Singleton
+public class EsIndexProducerImpl implements IndexProducer {
+    private static final Logger log = LoggerFactory.getLogger( EsIndexProducerImpl.class );
+
+    private final IndexFig config;
+    private final FailureMonitorImpl failureMonitor;
+    private final Client client;
+    private final Timer flushTimer;
+    private final IndexFig indexFig;
+    private final Counter indexSizeCounter;
+    private final Histogram roundtripTimer;
+    private final Timer indexTimer;
+
+
+    private AtomicLong inFlight = new AtomicLong();
+
+
+    @Inject
+    public EsIndexProducerImpl(final IndexFig config, final EsProvider provider,
+                               final MetricsFactory metricsFactory, final IndexFig indexFig) {
+        this.flushTimer = metricsFactory.getTimer(EsIndexProducerImpl.class, "index_buffer.flush");
+        this.indexSizeCounter = metricsFactory.getCounter(EsIndexProducerImpl.class, "index_buffer.size");
+        this.roundtripTimer = metricsFactory.getHistogram(EsIndexProducerImpl.class, "index_buffer.message_cycle");
+
+        //wire up the gauge of inflight messages
+        metricsFactory.addGauge(EsIndexProducerImpl.class, "index_buffer.inflight", () -> inFlight.longValue());
+
+
+        this.indexTimer = metricsFactory.getTimer( EsIndexProducerImpl.class, "index" );
+
+        this.config = config;
+        this.failureMonitor = new FailureMonitorImpl(config, provider);
+        this.client = provider.getClient();
+        this.indexFig = indexFig;
+
+
+        //batch up sets of some size and send them in batch
+
+    }
+
+    public Observable<IndexOperationMessage>  put( IndexOperationMessage message ) {
+        Preconditions.checkNotNull(message, "Message cannot be null");
+        indexSizeCounter.inc(message.getDeIndexRequests().size());
+        indexSizeCounter.inc(message.getIndexRequests().size());
+        return  processBatch(message);
+    }
+
+
+    /**
+     * Process the buffer of batches
+     * @param batch
+     * @return
+     */
+    private Observable<IndexOperationMessage> processBatch( final IndexOperationMessage batch ) {
+
+        //take our stream of batches, then stream then into individual ops for consumption on ES
+        final Set<IndexOperation> indexOperationSet = batch.getIndexRequests();
+        final Set<DeIndexOperation> deIndexOperationSet = batch.getDeIndexRequests();
+
+        final int indexOperationSetSize = indexOperationSet.size();
+        final int deIndexOperationSetSize = deIndexOperationSet.size();
+
+        log.debug("Emitting {} add and {} remove operations", indexOperationSetSize, deIndexOperationSetSize);
+
+        indexSizeCounter.dec(indexOperationSetSize);
+        indexSizeCounter.dec(deIndexOperationSetSize);
+
+        final Observable<IndexOperation> index = Observable.from(batch.getIndexRequests());
+        final Observable<DeIndexOperation> deIndex = Observable.from(batch.getDeIndexRequests());
+
+        final Observable<BatchOperation> batchOps = Observable.merge(index, deIndex);
+
+        //buffer into the max size we can send ES and fire them all off until we're completed
+        final Observable<BulkRequestBuilder> requests = batchOps.buffer(indexFig.getIndexBatchSize())
+            //flatten the buffer into a single batch execution
+            .flatMap(individualOps -> Observable.from(individualOps)
+                //collect them
+                .collect(() -> initRequest(), (bulkRequestBuilder, batchOperation) -> {
+                    log.debug("adding operation {} to bulkRequestBuilder {}", batchOperation, bulkRequestBuilder);
+                    batchOperation.doOperation(client, bulkRequestBuilder);
+                }))
+                //write them
+            .doOnNext(bulkRequestBuilder -> sendRequest(bulkRequestBuilder));
+
+
+        //now that we've processed them all, ack the futures after our last batch comes through
+        final Observable<IndexOperationMessage> processedIndexOperations =
+            requests.lastOrDefault(null).flatMap(lastRequest -> {
+                if (lastRequest != null) {
+                    return Observable.just(batch);
+                } else {
+                    return Observable.empty();
+                }
+            });
+
+        //subscribe to the operations that generate requests on a new thread so that we can execute them quickly
+        //mark this as done
+        return processedIndexOperations.doOnNext(processedIndexOp -> {
+            roundtripTimer.update(System.currentTimeMillis() - processedIndexOp.getCreationTime());
+        });
+    }
+
+
+    /*
+
+    /**
+     * initialize request
+     */
+    private BulkRequestBuilder initRequest() {
+        BulkRequestBuilder bulkRequest = client.prepareBulk();
+        bulkRequest.setConsistencyLevel( WriteConsistencyLevel.fromString( config.getWriteConsistencyLevel() ) );
+        bulkRequest.setRefresh( config.isForcedRefresh() );
+        return bulkRequest;
+    }
+
+
+    /**
+     * send bulk request
+     */
+    private void sendRequest( BulkRequestBuilder bulkRequest ) {
+        //nothing to do, we haven't added anything to the index
+        if ( bulkRequest.numberOfActions() == 0 ) {
+            return;
+        }
+
+        final BulkResponse responses;
+
+
+        final Timer.Context timer = indexTimer.time();
+
+        try {
+            responses = bulkRequest.execute().actionGet( );
+        } catch ( Throwable t ) {
+            log.error( "Unable to communicate with elasticsearch" );
+            failureMonitor.fail( "Unable to execute batch", t );
+            throw t;
+        }finally{
+            timer.stop();
+        }
+
+        failureMonitor.success();
+
+        boolean error = false;
+
+        for ( BulkItemResponse response : responses ) {
+
+            if ( response.isFailed() ) {
+                // log error and continue processing
+                log.error( "Unable to index id={}, type={}, index={}, failureMessage={} ", response.getId(),
+                    response.getType(), response.getIndex(), response.getFailureMessage() );
+
+                error = true;
+            }
+        }
+
+        if ( error ) {
+            throw new RuntimeException(
+                "Error during processing of bulk index operations one of the responses failed.  Check previous log "
+                    + "entries" );
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5218cda9/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferConsumer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferConsumer.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferConsumer.java
deleted file mode 100644
index cfeb505..0000000
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferConsumer.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *  contributor license agreements.  The ASF licenses this file to You
- * under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.  For additional information regarding
- * copyright in this work, please see the NOTICE file in the top level
- * directory of this distribution.
- */
-package org.apache.usergrid.persistence.index.impl;
-
-
-import org.apache.usergrid.persistence.index.EntityIndexBatch;
-import rx.Observable;
-
-import java.util.List;
-
-
-/**
- *  Buffer index requests
- */
-public interface IndexBufferConsumer {
-
-
-    /**
-     * Put this operation into our collapsing bufer
-     * @param message
-     * @return
-     */
-    Observable<IndexOperationMessage>  put(IndexOperationMessage message);
-
-}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5218cda9/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexProducer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexProducer.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexProducer.java
new file mode 100644
index 0000000..ba7027e
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexProducer.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.  For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+package org.apache.usergrid.persistence.index.impl;
+
+
+import rx.Observable;
+
+
+/**
+ *  Buffer index requests
+ */
+public interface IndexProducer {
+
+
+    /**
+     * Put this operation into our collapsing bufer
+     * @param message
+     * @return
+     */
+    Observable<IndexOperationMessage>  put(IndexOperationMessage message);
+
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5218cda9/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
index 7b9bc5d..01942a8 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
@@ -20,7 +20,6 @@
 package org.apache.usergrid.persistence.index.impl;
 
 
-import java.util.Collections;
 import java.util.Map;
 import java.util.UUID;
 
@@ -58,7 +57,7 @@ public class IndexRefreshCommandImpl implements IndexRefreshCommand {
 
     private final IndexCache indexCache;
     private final EsProvider esProvider;
-    private final IndexBufferConsumer producer;
+    private final IndexProducer producer;
     private final IndexFig indexFig;
     private final Timer timer;
 
@@ -66,7 +65,7 @@ public class IndexRefreshCommandImpl implements IndexRefreshCommand {
     @Inject
     public IndexRefreshCommandImpl(
                                     final EsProvider esProvider,
-                                    final IndexBufferConsumer producer,
+                                    final IndexProducer producer,
                                     final IndexFig indexFig,
                                     final MetricsFactory metricsFactory,
                                     final IndexCache indexCache ) {