You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/02/26 17:12:01 UTC

incubator-usergrid git commit: switch to producer consumer model

Repository: incubator-usergrid
Updated Branches:
  refs/heads/USERGRID-273-indexbuffer f32952e8a -> 77cfeb426


switch to producer consumer model


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/77cfeb42
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/77cfeb42
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/77cfeb42

Branch: refs/heads/USERGRID-273-indexbuffer
Commit: 77cfeb4261aa7a3d9474c076a9ca2c49d8a121eb
Parents: f32952e
Author: Shawn Feldman <sf...@apache.org>
Authored: Thu Feb 26 09:11:57 2015 -0700
Committer: Shawn Feldman <sf...@apache.org>
Committed: Thu Feb 26 09:11:57 2015 -0700

----------------------------------------------------------------------
 .../persistence/index/IndexBufferConsumer.java  |  26 +++
 .../persistence/index/IndexBufferProducer.java  |  35 ++++
 .../persistence/index/guice/IndexModule.java    |  11 +-
 .../persistence/index/impl/Consumer.java        | 188 ------------------
 .../index/impl/EsEntityIndexBatchImpl.java      |   8 +-
 .../index/impl/EsEntityIndexImpl.java           |  10 +-
 .../index/impl/EsIndexBufferConsumerImpl.java   | 191 +++++++++++++++++++
 .../index/impl/EsIndexBufferProducerImpl.java   |  52 +++++
 .../impl/EsIndexBufferProducerObservable.java   |  57 ++++++
 .../index/impl/IndexBatchBufferImpl.java        |  50 -----
 .../persistence/index/impl/Producer.java        |  54 ------
 11 files changed, 374 insertions(+), 308 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/77cfeb42/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBufferConsumer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBufferConsumer.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBufferConsumer.java
new file mode 100644
index 0000000..ac7489c
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBufferConsumer.java
@@ -0,0 +1,26 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  *  contributor license agreements.  The ASF licenses this file to You
+ *  * under the Apache License, Version 2.0 (the "License"); you may not
+ *  * use this file except in compliance with the License.
+ *  * You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.  For additional information regarding
+ *  * copyright in this work, please see the NOTICE file in the top level
+ *  * directory of this distribution.
+ *
+ */
+package org.apache.usergrid.persistence.index;
+
+/**
+ * Classy class class.
+ */
+public interface IndexBufferConsumer {
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/77cfeb42/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBufferProducer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBufferProducer.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBufferProducer.java
new file mode 100644
index 0000000..c7779a1
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBufferProducer.java
@@ -0,0 +1,35 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  *  contributor license agreements.  The ASF licenses this file to You
+ *  * under the Apache License, Version 2.0 (the "License"); you may not
+ *  * use this file except in compliance with the License.
+ *  * You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.  For additional information regarding
+ *  * copyright in this work, please see the NOTICE file in the top level
+ *  * directory of this distribution.
+ *
+ */
+package org.apache.usergrid.persistence.index;
+
+import org.apache.usergrid.persistence.core.future.BetterFuture;
+import org.apache.usergrid.persistence.index.IndexOperationMessage;
+import rx.Observable;
+import rx.Subscriber;
+
+/**
+ * Classy class class.
+ */
+public interface IndexBufferProducer  {
+
+    Observable.OnSubscribe<IndexOperationMessage> getObservable();
+
+    BetterFuture put(IndexOperationMessage message);
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/77cfeb42/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
index 5af148a..8e39ff7 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
@@ -19,14 +19,12 @@
 
 package org.apache.usergrid.persistence.index.guice;
 
-import org.apache.usergrid.persistence.index.IndexBatchBuffer;
-import org.apache.usergrid.persistence.index.IndexFig;
+import org.apache.usergrid.persistence.index.*;
 import com.google.inject.AbstractModule;
 import com.google.inject.assistedinject.FactoryModuleBuilder;
-import org.apache.usergrid.persistence.index.EntityIndex;
-import org.apache.usergrid.persistence.index.EntityIndexFactory;
 import org.apache.usergrid.persistence.index.impl.EsEntityIndexImpl;
-import org.apache.usergrid.persistence.index.impl.IndexBatchBufferImpl;
+import org.apache.usergrid.persistence.index.impl.EsIndexBufferConsumerImpl;
+import org.apache.usergrid.persistence.index.impl.EsIndexBufferProducerImpl;
 import org.safehaus.guicyfig.GuicyFigModule;
 
 
@@ -42,8 +40,7 @@ public class IndexModule extends AbstractModule {
                 .implement(EntityIndex.class, EsEntityIndexImpl.class)
                 .build(EntityIndexFactory.class));
 
-        bind(IndexBatchBuffer.class).to(IndexBatchBufferImpl.class);
-
+        bind(IndexBufferProducer.class).to(EsIndexBufferProducerImpl.class);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/77cfeb42/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/Consumer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/Consumer.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/Consumer.java
deleted file mode 100644
index b34a66d..0000000
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/Consumer.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  *  contributor license agreements.  The ASF licenses this file to You
- *  * under the Apache License, Version 2.0 (the "License"); you may not
- *  * use this file except in compliance with the License.
- *  * You may obtain a copy of the License at
- *  *
- *  *     http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.  For additional information regarding
- *  * copyright in this work, please see the NOTICE file in the top level
- *  * directory of this distribution.
- *
- */
-package org.apache.usergrid.persistence.index.impl;
-
-import com.codahale.metrics.Counter;
-import com.codahale.metrics.Timer;
-import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
-import org.apache.usergrid.persistence.index.IndexBatchBuffer;
-import org.apache.usergrid.persistence.index.IndexFig;
-import org.apache.usergrid.persistence.index.IndexOperationMessage;
-import org.elasticsearch.action.WriteConsistencyLevel;
-import org.elasticsearch.action.bulk.BulkItemResponse;
-import org.elasticsearch.action.bulk.BulkRequestBuilder;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.delete.DeleteRequestBuilder;
-import org.elasticsearch.action.index.IndexRequestBuilder;
-import org.elasticsearch.action.support.replication.ShardReplicationOperationRequestBuilder;
-import org.elasticsearch.client.Client;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import rx.Observable;
-import rx.functions.Action1;
-import rx.functions.Func1;
-import rx.schedulers.Schedulers;
-
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * Consumer for IndexOperationMessages
- */
-public class Consumer {
-    private static final Logger log = LoggerFactory.getLogger(Consumer.class);
-
-    private final IndexFig config;
-    private final FailureMonitorImpl failureMonitor;
-    private final Client client;
-    private final Observable<List<IndexOperationMessage>> consumer;
-    private final Timer flushTimer;
-    private final Counter indexSizeCounter;
-
-    public Consumer(final IndexFig config,final Producer producer, final EsProvider provider,  final MetricsFactory metricsFactory){
-        this.flushTimer = metricsFactory.getTimer(IndexBatchBuffer.class, "index.buffer.flush");
-        this.indexSizeCounter =  metricsFactory.getCounter(IndexBatchBuffer.class, "index.buffer.size");
-        this.config = config;
-        this.failureMonitor = new FailureMonitorImpl(config,provider);
-        this.client = provider.getClient();
-
-        final AtomicLong queueSize = new AtomicLong();
-        //batch up sets of some size and send them in batch
-        this.consumer = Observable.create(producer)
-            .subscribeOn(Schedulers.io())
-            .doOnNext(new Action1<IndexOperationMessage>() {
-                @Override
-                public void call(IndexOperationMessage requestBuilderContainer) {
-                    queueSize.addAndGet(requestBuilderContainer.getBuilder().size());
-                }
-            })
-            .buffer(config.getIndexBufferTimeout(), TimeUnit.MILLISECONDS, config.getIndexBufferSize())
-            .doOnNext(new Action1<List<IndexOperationMessage>>() {
-                @Override
-                public void call(List<IndexOperationMessage> containerList) {
-                    flushTimer.time();
-                    if(containerList.size()>0){
-                        execute(containerList);
-                    }
-                }
-            });
-        consumer.subscribe();
-    }
-
-    /**
-     * Execute the request, check for errors, then re-init the batch for future use
-     */
-    private void execute(final List<IndexOperationMessage> operationMessages) {
-
-        if (operationMessages == null || operationMessages.size() == 0) {
-            return;
-        }
-
-        //process and flatten all the messages to builder requests
-        Observable<ShardReplicationOperationRequestBuilder> flattenMessages = Observable.from(operationMessages)
-            .subscribeOn(Schedulers.io())
-            .flatMap(new Func1<IndexOperationMessage, Observable<ShardReplicationOperationRequestBuilder>>() {
-                @Override
-                public Observable<ShardReplicationOperationRequestBuilder> call(IndexOperationMessage operationMessage) {
-                    return Observable.from(operationMessage.getBuilder())
-                        .map(new Func1<ShardReplicationOperationRequestBuilder, ShardReplicationOperationRequestBuilder>() {
-                            @Override
-                            public ShardReplicationOperationRequestBuilder call(ShardReplicationOperationRequestBuilder builder) {
-                                return builder;
-                            }
-                        });
-                }
-            });
-
-        //batch shard operations into a bulk request
-        flattenMessages
-            .buffer(config.getIndexBatchSize())
-            .doOnNext(new Action1<List<ShardReplicationOperationRequestBuilder>>() {
-                @Override
-                public void call(List<ShardReplicationOperationRequestBuilder> builders) {
-                    final BulkRequestBuilder bulkRequest = initRequest();
-                    for (ShardReplicationOperationRequestBuilder builder : builders) {
-                        indexSizeCounter.dec();
-                        if (builder instanceof IndexRequestBuilder) {
-                            bulkRequest.add((IndexRequestBuilder) builder);
-                        }
-                        if (builder instanceof DeleteRequestBuilder) {
-                            bulkRequest.add((DeleteRequestBuilder) builder);
-                        }
-                    }
-                    sendRequest(bulkRequest);
-                }
-            })
-            .toBlocking().lastOrDefault(null);
-
-        //call back all futures
-        Observable.from(operationMessages)
-            .subscribeOn(Schedulers.io())
-            .doOnNext(new Action1<IndexOperationMessage>() {
-                @Override
-                public void call(IndexOperationMessage operationMessage) {
-                    operationMessage.getFuture().done();
-                }
-            })
-            .toBlocking().lastOrDefault(null);
-    }
-
-    /**
-     * initialize request
-     * @return
-     */
-    private BulkRequestBuilder initRequest() {
-        BulkRequestBuilder bulkRequest = client.prepareBulk();
-        bulkRequest.setConsistencyLevel(WriteConsistencyLevel.fromString(config.getWriteConsistencyLevel()));
-        bulkRequest.setRefresh(config.isForcedRefresh());
-        return bulkRequest;
-    }
-
-    /**
-     * send bulk request
-     * @param bulkRequest
-     */
-    private void sendRequest(BulkRequestBuilder bulkRequest) {
-        //nothing to do, we haven't added anthing to the index
-        if (bulkRequest.numberOfActions() == 0) {
-            return;
-        }
-
-        final BulkResponse responses;
-
-        try {
-            responses = bulkRequest.execute().actionGet();
-        } catch (Throwable t) {
-            log.error("Unable to communicate with elasticsearch");
-            failureMonitor.fail("Unable to execute batch", t);
-            throw t;
-        }
-
-        failureMonitor.success();
-
-        for (BulkItemResponse response : responses) {
-            if (response.isFailed()) {
-                throw new RuntimeException("Unable to index documents.  Errors are :"
-                    + response.getFailure().getMessage());
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/77cfeb42/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
index 455dba6..2e0fb56 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
@@ -76,18 +76,18 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
     private final IndexIdentifier.IndexAlias alias;
     private final IndexIdentifier indexIdentifier;
 
-    private final IndexBatchBuffer indexBatchBuffer;
+    private final IndexBufferProducer indexBatchBufferProducer;
 
     private final AliasedEntityIndex entityIndex;
     private IndexOperationMessage container;
 
 
-    public EsEntityIndexBatchImpl(final ApplicationScope applicationScope, final Client client,final IndexBatchBuffer indexBatchBuffer,
+    public EsEntityIndexBatchImpl(final ApplicationScope applicationScope, final Client client,final IndexBufferProducer indexBatchBufferProducer,
             final IndexFig config, final AliasedEntityIndex entityIndex ) {
 
         this.applicationScope = applicationScope;
         this.client = client;
-        this.indexBatchBuffer = indexBatchBuffer;
+        this.indexBatchBufferProducer = indexBatchBufferProducer;
         this.entityIndex = entityIndex;
         this.indexIdentifier = IndexingUtils.createIndexIdentifier(config, applicationScope);
         this.alias = indexIdentifier.getAlias();
@@ -204,7 +204,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
     public BetterFuture execute() {
         IndexOperationMessage tempContainer = container;
         container = new IndexOperationMessage();
-        return indexBatchBuffer.put(tempContainer);
+        return indexBatchBufferProducer.put(tempContainer);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/77cfeb42/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index 693b168..ad638c6 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@ -87,7 +87,7 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
 
     private final IndexIdentifier.IndexAlias alias;
     private final IndexIdentifier indexIdentifier;
-    private final IndexBatchBuffer indexBatchBuffer;
+    private final IndexBufferProducer indexBatchBufferProducer;
 
     /**
      * We purposefully make this per instance. Some indexes may work, while others may fail
@@ -119,8 +119,8 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
 
 
     @Inject
-    public EsEntityIndexImpl( @Assisted final ApplicationScope appScope, final IndexFig config, final IndexBatchBuffer indexBatchBuffer, final EsProvider provider, final EsIndexCache indexCache) {
-        this.indexBatchBuffer = indexBatchBuffer;
+    public EsEntityIndexImpl( @Assisted final ApplicationScope appScope, final IndexFig config, final IndexBufferProducer indexBatchBufferProducer, final EsProvider provider, final EsIndexCache indexCache) {
+        this.indexBatchBufferProducer = indexBatchBufferProducer;
         ValidationUtils.validateApplicationScope( appScope );
         this.applicationScope = appScope;
         this.esProvider = provider;
@@ -282,7 +282,7 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
     @Override
     public EntityIndexBatch createBatch() {
         EntityIndexBatch batch = new EsEntityIndexBatchImpl(
-                applicationScope, esProvider.getClient(),indexBatchBuffer, config, this );
+                applicationScope, esProvider.getClient(),indexBatchBufferProducer, config, this );
         return batch;
     }
 
@@ -434,7 +434,7 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
 
     public void refresh() {
 
-        BetterFuture future = indexBatchBuffer.put(new IndexOperationMessage());
+        BetterFuture future = indexBatchBufferProducer.put(new IndexOperationMessage());
         future.get();
         //loop through all batches and retrieve promises and call get
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/77cfeb42/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
new file mode 100644
index 0000000..af61824
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
@@ -0,0 +1,191 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  *  contributor license agreements.  The ASF licenses this file to You
+ *  * under the Apache License, Version 2.0 (the "License"); you may not
+ *  * use this file except in compliance with the License.
+ *  * You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.  For additional information regarding
+ *  * copyright in this work, please see the NOTICE file in the top level
+ *  * directory of this distribution.
+ *
+ */
+package org.apache.usergrid.persistence.index.impl;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Timer;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.index.IndexBufferConsumer;
+import org.apache.usergrid.persistence.index.IndexBufferProducer;
+import org.apache.usergrid.persistence.index.IndexFig;
+import org.apache.usergrid.persistence.index.IndexOperationMessage;
+import org.elasticsearch.action.WriteConsistencyLevel;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.delete.DeleteRequestBuilder;
+import org.elasticsearch.action.index.IndexRequestBuilder;
+import org.elasticsearch.action.support.replication.ShardReplicationOperationRequestBuilder;
+import org.elasticsearch.client.Client;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import rx.Observable;
+import rx.functions.Action1;
+import rx.functions.Func1;
+import rx.schedulers.Schedulers;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Consumer for IndexOperationMessages
+ */
+public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
+    private static final Logger log = LoggerFactory.getLogger(EsIndexBufferConsumerImpl.class);
+
+    private final IndexFig config;
+    private final FailureMonitorImpl failureMonitor;
+    private final Client client;
+    private final Observable<List<IndexOperationMessage>> consumer;
+    private final Timer flushTimer;
+    private final Counter indexSizeCounter;
+
+    public EsIndexBufferConsumerImpl(final IndexFig config, final EsIndexBufferProducerObservable producer, final EsProvider provider, final MetricsFactory metricsFactory){
+        this.flushTimer = metricsFactory.getTimer(EsIndexBufferConsumerImpl.class, "index.buffer.flush");
+        this.indexSizeCounter =  metricsFactory.getCounter(EsIndexBufferConsumerImpl.class, "index.buffer.size");
+        this.config = config;
+        this.failureMonitor = new FailureMonitorImpl(config,provider);
+        this.client = provider.getClient();
+
+        final AtomicLong queueSize = new AtomicLong();
+        //batch up sets of some size and send them in batch
+        this.consumer = Observable.create(producer)
+            .subscribeOn(Schedulers.io())
+            .doOnNext(new Action1<IndexOperationMessage>() {
+                @Override
+                public void call(IndexOperationMessage requestBuilderContainer) {
+                    queueSize.addAndGet(requestBuilderContainer.getBuilder().size());
+                }
+            })
+            .buffer(config.getIndexBufferTimeout(), TimeUnit.MILLISECONDS, config.getIndexBufferSize())
+            .doOnNext(new Action1<List<IndexOperationMessage>>() {
+                @Override
+                public void call(List<IndexOperationMessage> containerList) {
+                    flushTimer.time();
+                    if(containerList.size()>0){
+                        execute(containerList);
+                    }
+                }
+            });
+        consumer.subscribe();
+    }
+
+    /**
+     * Execute the request, check for errors, then re-init the batch for future use
+     */
+    private void execute(final List<IndexOperationMessage> operationMessages) {
+
+        if (operationMessages == null || operationMessages.size() == 0) {
+            return;
+        }
+
+        //process and flatten all the messages to builder requests
+        Observable<ShardReplicationOperationRequestBuilder> flattenMessages = Observable.from(operationMessages)
+            .subscribeOn(Schedulers.io())
+            .flatMap(new Func1<IndexOperationMessage, Observable<ShardReplicationOperationRequestBuilder>>() {
+                @Override
+                public Observable<ShardReplicationOperationRequestBuilder> call(IndexOperationMessage operationMessage) {
+                    return Observable.from(operationMessage.getBuilder())
+                        .map(new Func1<ShardReplicationOperationRequestBuilder, ShardReplicationOperationRequestBuilder>() {
+                            @Override
+                            public ShardReplicationOperationRequestBuilder call(ShardReplicationOperationRequestBuilder builder) {
+                                return builder;
+                            }
+                        });
+                }
+            });
+
+        //batch shard operations into a bulk request
+        flattenMessages
+            .buffer(config.getIndexBatchSize())
+            .doOnNext(new Action1<List<ShardReplicationOperationRequestBuilder>>() {
+                @Override
+                public void call(List<ShardReplicationOperationRequestBuilder> builders) {
+                    final BulkRequestBuilder bulkRequest = initRequest();
+                    for (ShardReplicationOperationRequestBuilder builder : builders) {
+                        indexSizeCounter.dec();
+                        if (builder instanceof IndexRequestBuilder) {
+                            bulkRequest.add((IndexRequestBuilder) builder);
+                        }
+                        if (builder instanceof DeleteRequestBuilder) {
+                            bulkRequest.add((DeleteRequestBuilder) builder);
+                        }
+                    }
+                    sendRequest(bulkRequest);
+                }
+            })
+            .toBlocking().lastOrDefault(null);
+
+        //call back all futures
+        Observable.from(operationMessages)
+            .subscribeOn(Schedulers.io())
+            .doOnNext(new Action1<IndexOperationMessage>() {
+                @Override
+                public void call(IndexOperationMessage operationMessage) {
+                    operationMessage.getFuture().done();
+                }
+            })
+            .toBlocking().lastOrDefault(null);
+    }
+
+    /**
+     * initialize request
+     * @return
+     */
+    private BulkRequestBuilder initRequest() {
+        BulkRequestBuilder bulkRequest = client.prepareBulk();
+        bulkRequest.setConsistencyLevel(WriteConsistencyLevel.fromString(config.getWriteConsistencyLevel()));
+        bulkRequest.setRefresh(config.isForcedRefresh());
+        return bulkRequest;
+    }
+
+    /**
+     * send bulk request
+     * @param bulkRequest
+     */
+    private void sendRequest(BulkRequestBuilder bulkRequest) {
+        //nothing to do, we haven't added anthing to the index
+        if (bulkRequest.numberOfActions() == 0) {
+            return;
+        }
+
+        final BulkResponse responses;
+
+        try {
+            responses = bulkRequest.execute().actionGet();
+        } catch (Throwable t) {
+            log.error("Unable to communicate with elasticsearch");
+            failureMonitor.fail("Unable to execute batch", t);
+            throw t;
+        }
+
+        failureMonitor.success();
+
+        for (BulkItemResponse response : responses) {
+            if (response.isFailed()) {
+                throw new RuntimeException("Unable to index documents.  Errors are :"
+                    + response.getFailure().getMessage());
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/77cfeb42/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java
new file mode 100644
index 0000000..715bb84
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.persistence.index.impl;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import org.apache.usergrid.persistence.core.future.BetterFuture;
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.index.IndexBufferConsumer;
+import org.apache.usergrid.persistence.index.IndexBufferProducer;
+import org.apache.usergrid.persistence.index.IndexFig;
+import org.apache.usergrid.persistence.index.IndexOperationMessage;
+import rx.Observable;
+
+/**
+ * Buffer index requests into sets to send.
+ */
+@Singleton
+public class EsIndexBufferProducerImpl implements IndexBufferProducer {
+    private final IndexBufferConsumer consumer;
+    private EsIndexBufferProducerObservable producer;
+
+    @Inject
+    public EsIndexBufferProducerImpl(final MetricsFactory metricsFactory, final IndexFig config, final EsProvider provider){
+        this.producer = new EsIndexBufferProducerObservable(metricsFactory);
+        this.consumer = new EsIndexBufferConsumerImpl(config,producer,provider,metricsFactory);
+    }
+
+    @Override
+    public Observable.OnSubscribe<IndexOperationMessage> getObservable() {
+        return producer;
+    }
+
+    @Override
+    public BetterFuture put(IndexOperationMessage container){
+        return producer.put(container);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/77cfeb42/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerObservable.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerObservable.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerObservable.java
new file mode 100644
index 0000000..4a70bca
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerObservable.java
@@ -0,0 +1,57 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  *  contributor license agreements.  The ASF licenses this file to You
+ *  * under the Apache License, Version 2.0 (the "License"); you may not
+ *  * use this file except in compliance with the License.
+ *  * You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.  For additional information regarding
+ *  * copyright in this work, please see the NOTICE file in the top level
+ *  * directory of this distribution.
+ *
+ */
+package org.apache.usergrid.persistence.index.impl;
+
+import com.codahale.metrics.Counter;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import org.apache.usergrid.persistence.core.future.BetterFuture;
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.index.IndexOperationMessage;
+import rx.Observable;
+import rx.Subscriber;
+
+/**
+ * Producer for index operation messages
+ */
+@Singleton
+public class EsIndexBufferProducerObservable implements Observable.OnSubscribe<IndexOperationMessage> {
+
+    private final MetricsFactory metricsFactory;
+    private final Counter indexSizeCounter;
+    private Subscriber<? super IndexOperationMessage> subscriber;
+
+    @Inject
+    public EsIndexBufferProducerObservable(MetricsFactory metricsFactory){
+        this.metricsFactory = metricsFactory;
+        this.indexSizeCounter = metricsFactory.getCounter(EsIndexBufferProducerObservable.class,"index.buffer.size");
+
+    }
+    @Override
+    public void call(Subscriber<? super IndexOperationMessage> subscriber) {
+        this.subscriber = subscriber;
+    }
+
+    public BetterFuture put(IndexOperationMessage message){
+        indexSizeCounter.inc(message.getBuilder().size());
+        subscriber.onNext(message);
+        return message.getFuture();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/77cfeb42/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
deleted file mode 100644
index dd95de7..0000000
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.usergrid.persistence.index.impl;
-
-import com.codahale.metrics.Counter;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-import org.apache.usergrid.persistence.core.future.BetterFuture;
-import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
-import org.apache.usergrid.persistence.index.IndexBatchBuffer;
-import org.apache.usergrid.persistence.index.IndexFig;
-import org.apache.usergrid.persistence.index.IndexOperationMessage;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Buffer index requests into sets to send.
- */
-@Singleton
-public class IndexBatchBufferImpl implements IndexBatchBuffer {
-    private Consumer consumer;
-    private Producer producer;
-
-    @Inject
-    public IndexBatchBufferImpl(final IndexFig config, final EsProvider provider, final MetricsFactory metricsFactory){
-        this.producer = new Producer(metricsFactory);
-        this.consumer = new Consumer(config,producer,provider,metricsFactory);
-    }
-
-    @Override
-    public BetterFuture put(IndexOperationMessage container){
-        return producer.put(container);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/77cfeb42/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/Producer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/Producer.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/Producer.java
deleted file mode 100644
index 06a1138..0000000
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/Producer.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  *  contributor license agreements.  The ASF licenses this file to You
- *  * under the Apache License, Version 2.0 (the "License"); you may not
- *  * use this file except in compliance with the License.
- *  * You may obtain a copy of the License at
- *  *
- *  *     http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.  For additional information regarding
- *  * copyright in this work, please see the NOTICE file in the top level
- *  * directory of this distribution.
- *
- */
-package org.apache.usergrid.persistence.index.impl;
-
-import com.codahale.metrics.Counter;
-import org.apache.usergrid.persistence.core.future.BetterFuture;
-import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
-import org.apache.usergrid.persistence.index.IndexBatchBuffer;
-import org.apache.usergrid.persistence.index.IndexOperationMessage;
-import rx.Observable;
-import rx.Subscriber;
-
-/**
- * Producer for index operation messages
- */
-public class Producer implements Observable.OnSubscribe<IndexOperationMessage> {
-
-    private final MetricsFactory metricsFactory;
-    private final Counter indexSizeCounter;
-    private Subscriber<? super IndexOperationMessage> subscriber;
-
-    public Producer(MetricsFactory metricsFactory){
-        this.metricsFactory = metricsFactory;
-        this.indexSizeCounter = metricsFactory.getCounter(IndexBatchBuffer.class,"index.buffer.size");
-
-    }
-    @Override
-    public void call(Subscriber<? super IndexOperationMessage> subscriber) {
-        this.subscriber = subscriber;
-    }
-
-    public BetterFuture put(IndexOperationMessage message){
-        indexSizeCounter.inc(message.getBuilder().size());
-        subscriber.onNext(message);
-        return message.getFuture();
-    }
-}