You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/02/25 17:33:05 UTC

[1/2] incubator-usergrid git commit: add batches of batches

Repository: incubator-usergrid
Updated Branches:
  refs/heads/USERGRID-273-indexbuffer 3231534d7 -> ac55e0878


add batches of batches


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/420f5009
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/420f5009
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/420f5009

Branch: refs/heads/USERGRID-273-indexbuffer
Commit: 420f50093e90c2d764d7415a48b1bfb204a1e1f4
Parents: 3231534
Author: Shawn Feldman <sf...@apache.org>
Authored: Tue Feb 24 18:57:15 2015 -0700
Committer: Shawn Feldman <sf...@apache.org>
Committed: Tue Feb 24 18:57:15 2015 -0700

----------------------------------------------------------------------
 .../persistence/index/IndexBatchBuffer.java     | 11 +--
 .../index/RequestBuilderContainer.java          | 56 ++++++++++++
 .../index/impl/EsEntityIndexBatchImpl.java      | 40 ++-------
 .../index/impl/IndexBatchBufferImpl.java        | 90 ++++++++------------
 4 files changed, 100 insertions(+), 97 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/420f5009/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBatchBuffer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBatchBuffer.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBatchBuffer.java
index b24d37b..1d71bcd 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBatchBuffer.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBatchBuffer.java
@@ -34,16 +34,11 @@ public interface IndexBatchBuffer {
     /**
      * put request into buffer, retu
      *
-     * @param builder
+     * @param container
      */
-    public BetterFuture put(IndexRequestBuilder builder);
+    public BetterFuture put(RequestBuilderContainer container);
+
 
-    /**
-     * put request into buffer
-     *
-     * @param builder
-     */
-    public BetterFuture put(DeleteRequestBuilder builder);
 
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/420f5009/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/RequestBuilderContainer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/RequestBuilderContainer.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/RequestBuilderContainer.java
new file mode 100644
index 0000000..bc5e115
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/RequestBuilderContainer.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.persistence.index;
+
+import org.apache.usergrid.persistence.core.future.BetterFuture;
+import org.elasticsearch.action.support.replication.ShardReplicationOperationRequestBuilder;
+
+import java.util.Iterator;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * Classy class class.
+ */
+public  class RequestBuilderContainer{
+    private final ConcurrentLinkedQueue<ShardReplicationOperationRequestBuilder> builders;
+    private final BetterFuture<Iterator<ShardReplicationOperationRequestBuilder>> containerFuture;
+
+    public RequestBuilderContainer(){
+        final RequestBuilderContainer parent = this;
+        builders = new ConcurrentLinkedQueue<>();
+        this.containerFuture = new BetterFuture<>(new Callable<Iterator<ShardReplicationOperationRequestBuilder>>() {
+            @Override
+            public Iterator<ShardReplicationOperationRequestBuilder> call() throws Exception {
+                return parent.getBuilder().iterator();
+            }
+        });
+    }
+
+    public void add(ShardReplicationOperationRequestBuilder builder){
+        builders.add(builder);
+    }
+    public ConcurrentLinkedQueue<ShardReplicationOperationRequestBuilder> getBuilder(){
+        return builders;
+    }
+    public void done(){
+        containerFuture.done();
+    }
+    public BetterFuture<Iterator<ShardReplicationOperationRequestBuilder>> getFuture(){
+        return containerFuture;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/420f5009/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
index e008707..dc8b1af 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
@@ -91,7 +91,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
     private final FailureMonitor failureMonitor;
 
     private final AliasedEntityIndex entityIndex;
-    private final ConcurrentLinkedQueue<BetterFuture> promises;
+    private final RequestBuilderContainer container;
 
 
     public EsEntityIndexBatchImpl(final ApplicationScope applicationScope, final Client client,final IndexBatchBuffer indexBatchBuffer,
@@ -106,7 +106,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
         this.alias = indexIdentifier.getAlias();
         this.refresh = config.isForcedRefresh();
         //constrained
-        this.promises = new ConcurrentLinkedQueue<>();
+        this.container = new RequestBuilderContainer();
     }
 
 
@@ -140,7 +140,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
         final String entityType = entity.getId().getType();
         IndexRequestBuilder builder =
                 client.prepareIndex(alias.getWriteAlias(), entityType, indexId).setSource( entityAsMap );
-        promises.add(indexBatchBuffer.put(builder));
+        container.add(builder);
         return this;
     }
 
@@ -186,7 +186,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
                    public Object call(String index) {
                        try {
                            DeleteRequestBuilder builder = client.prepareDelete(index, entityType, indexId).setRefresh(refresh);
-                           promises.add(indexBatchBuffer.put(builder));
+                           container.add(builder);
                        }catch (Exception e){
                            log.error("failed to deindex",e);
                            throw e;
@@ -197,14 +197,12 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
 
         log.debug("Deindexed Entity with index id " + indexId);
 
-
         return this;
     }
 
 
     @Override
     public EntityIndexBatch deindex( final IndexScope indexScope, final Entity entity ) {
-
         return deindex( indexScope, entity.getId(), entity.getVersion() );
     }
 
@@ -217,40 +215,16 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
 
     @Override
     public void execute() {
-       flushFutures();
+        indexBatchBuffer.put(container);
     }
 
     @Override
     public void executeAndRefresh() {
-        flushFutures();
+        BetterFuture future = indexBatchBuffer.put(container);
+        future.get();
         entityIndex.refresh();
     }
 
-    private void flushFutures() {
-        ObservableIterator<BetterFuture> iterator = new ObservableIterator<BetterFuture>("futures") {
-            @Override
-            protected Iterator<BetterFuture> getIterator() {
-                return promises.iterator();
-            }
-        };
-        Observable.create(iterator)
-                .doOnNext(new Action1<BetterFuture>() {
-                    @Override
-                    public void call(BetterFuture betterFuture) {
-                        betterFuture.get();
-                    }
-                })
-                .buffer(100)
-                .doOnNext(new Action1<List<BetterFuture>>() {
-                    @Override
-                    public void call(List<BetterFuture> betterFutures) {
-                        promises.removeAll(betterFutures);
-                    }
-                })
-                .toBlocking()
-                .lastOrDefault(null);
-    }
-
     /**
      * Set the entity as a map with the context
      *

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/420f5009/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
index ad14920..364c20a 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
@@ -24,6 +24,7 @@ import org.apache.usergrid.persistence.core.future.BetterFuture;
 import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.index.IndexBatchBuffer;
 import org.apache.usergrid.persistence.index.IndexFig;
+import org.apache.usergrid.persistence.index.RequestBuilderContainer;
 import org.elasticsearch.action.WriteConsistencyLevel;
 import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkRequestBuilder;
@@ -38,9 +39,11 @@ import rx.Observable;
 import rx.Subscriber;
 import rx.functions.Action0;
 import rx.functions.Action1;
+import rx.functions.Func1;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.*;
 
@@ -96,81 +99,56 @@ public class IndexBatchBufferImpl implements IndexBatchBuffer {
     }
 
     @Override
-    public BetterFuture<ShardReplicationOperationRequestBuilder> put(IndexRequestBuilder builder){
-        RequestBuilderContainer container = new RequestBuilderContainer(builder);
+    public BetterFuture put(RequestBuilderContainer container){
         bufferCounter.inc();
         producer.put(container);
         return container.getFuture();
     }
 
-    @Override
-    public BetterFuture<ShardReplicationOperationRequestBuilder> put(DeleteRequestBuilder builder){
-        RequestBuilderContainer container = new RequestBuilderContainer(builder);
-        bufferCounter.inc();
-        producer.put(container);
-        return container.getFuture();
-    }
-
-
 
-    private static class RequestBuilderContainer{
-        private final ShardReplicationOperationRequestBuilder builder;
-        private final BetterFuture<ShardReplicationOperationRequestBuilder> containerFuture;
-
-        public RequestBuilderContainer(ShardReplicationOperationRequestBuilder builder){
-            final RequestBuilderContainer parent = this;
-            this.builder = builder;
-            this.containerFuture = new BetterFuture<>(new Callable<ShardReplicationOperationRequestBuilder>() {
-                @Override
-                public ShardReplicationOperationRequestBuilder call() throws Exception {
-                    return parent.getBuilder();
-                }
-            });
-        }
-
-        public ShardReplicationOperationRequestBuilder getBuilder(){
-            return builder;
-        }
-        private void done(){
-            containerFuture.done();
-        }
-        public BetterFuture<ShardReplicationOperationRequestBuilder> getFuture(){
-            return containerFuture;
-        }
-    }
     /**
      * Execute the request, check for errors, then re-init the batch for future use
      */
-    private void execute( boolean refresh) {
+    private void execute(final boolean refresh) {
 
         if (blockingQueue.size() == 0) {
             return;
         }
 
-        BulkRequestBuilder bulkRequest = initRequest(refresh);
 
         Collection<RequestBuilderContainer> containerCollection = new ArrayList<>(config.getIndexBatchSize());
         blockingQueue.drainTo(containerCollection);
-        int count = 0;
         //clear the queue or proceed to buffersize
-        for (RequestBuilderContainer container : containerCollection) {
-
-            ShardReplicationOperationRequestBuilder builder = container.getBuilder();
-            //only handle two types of requests for now, annoyingly there is no base class implementation on BulkRequest
-            if (builder instanceof IndexRequestBuilder) {
-                bulkRequest.add((IndexRequestBuilder) builder);
-            }
-            if (builder instanceof DeleteRequestBuilder) {
-                bulkRequest.add((DeleteRequestBuilder) builder);
-            }
-
-            if (count++ == config.getIndexBatchSize()) {
-                sendRequest(bulkRequest);
-                bulkRequest = initRequest(refresh);
+        Observable.from(containerCollection)
+                .flatMap(new Func1<RequestBuilderContainer, Observable<ShardReplicationOperationRequestBuilder>>() {
+                    @Override
+                    public Observable<ShardReplicationOperationRequestBuilder> call(RequestBuilderContainer requestBuilderContainer) {
+                        return Observable.from(requestBuilderContainer.getBuilder())
+                                .map(new Func1<ShardReplicationOperationRequestBuilder, ShardReplicationOperationRequestBuilder>() {
+                                    @Override
+                                    public ShardReplicationOperationRequestBuilder call(ShardReplicationOperationRequestBuilder builder) {
+                                        return builder;
+                                    }
+                                });
+                    }
+                })
+                .buffer(config.getIndexBatchSize())
+                .doOnNext(new Action1<List<ShardReplicationOperationRequestBuilder>>() {
+                    @Override
+                    public void call(List<ShardReplicationOperationRequestBuilder> builders) {
+                        final BulkRequestBuilder bulkRequest = initRequest(refresh);
+                        for(ShardReplicationOperationRequestBuilder builder : builders) {
+                            if (builder instanceof IndexRequestBuilder) {
+                                bulkRequest.add((IndexRequestBuilder) builder);
+                            }
+                            if (builder instanceof DeleteRequestBuilder) {
+                                bulkRequest.add((DeleteRequestBuilder) builder);
+                            }
+                            sendRequest(bulkRequest);
+                        }
+                    }
+                }).toBlocking().lastOrDefault(null);
 
-            }
-        }
-        sendRequest(bulkRequest);
         for (RequestBuilderContainer container : containerCollection) {
             container.done();
         }


[2/2] incubator-usergrid git commit: Add batch of batches

Posted by sf...@apache.org.
Add batch of batches


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/ac55e087
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/ac55e087
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/ac55e087

Branch: refs/heads/USERGRID-273-indexbuffer
Commit: ac55e0878bc5a73deb8e07137f5394cb85b409a0
Parents: 420f500
Author: Shawn Feldman <sf...@apache.org>
Authored: Wed Feb 25 09:32:48 2015 -0700
Committer: Shawn Feldman <sf...@apache.org>
Committed: Wed Feb 25 09:32:48 2015 -0700

----------------------------------------------------------------------
 .../persistence/index/EntityIndexBatch.java     |  3 +-
 .../index/RequestBuilderContainer.java          |  9 ++++
 .../index/impl/EsEntityIndexBatchImpl.java      | 19 +++++----
 .../index/impl/EsEntityIndexImpl.java           |  2 +-
 .../index/impl/IndexBatchBufferImpl.java        | 44 ++++++++++++--------
 .../persistence/index/impl/EntityIndexTest.java |  9 ++--
 6 files changed, 55 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ac55e087/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
index f5b8abc..f3f9100 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
@@ -20,6 +20,7 @@ package org.apache.usergrid.persistence.index;/*
 
 import java.util.UUID;
 
+import org.apache.usergrid.persistence.core.future.BetterFuture;
 import org.apache.usergrid.persistence.index.query.CandidateResult;
 import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.Id;
@@ -64,7 +65,7 @@ public interface EntityIndexBatch {
     /**
      * Execute the batch
      */
-    public void execute();
+    public BetterFuture execute();
 
     /**
      * Execute the batch and force the refresh

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ac55e087/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/RequestBuilderContainer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/RequestBuilderContainer.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/RequestBuilderContainer.java
index bc5e115..e7928ff 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/RequestBuilderContainer.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/RequestBuilderContainer.java
@@ -30,6 +30,8 @@ public  class RequestBuilderContainer{
     private final ConcurrentLinkedQueue<ShardReplicationOperationRequestBuilder> builders;
     private final BetterFuture<Iterator<ShardReplicationOperationRequestBuilder>> containerFuture;
 
+    private  boolean forceRefresh = false;
+
     public RequestBuilderContainer(){
         final RequestBuilderContainer parent = this;
         builders = new ConcurrentLinkedQueue<>();
@@ -53,4 +55,11 @@ public  class RequestBuilderContainer{
     public BetterFuture<Iterator<ShardReplicationOperationRequestBuilder>> getFuture(){
         return containerFuture;
     }
+
+    public boolean isForceRefresh() {
+        return forceRefresh;
+    }
+    public void setForceRefresh(boolean forceRefresh) {
+        this.forceRefresh = forceRefresh;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ac55e087/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
index dc8b1af..4fcdeb3 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
@@ -88,19 +88,17 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
     private final IndexIdentifier indexIdentifier;
 
     private final IndexBatchBuffer indexBatchBuffer;
-    private final FailureMonitor failureMonitor;
 
     private final AliasedEntityIndex entityIndex;
-    private final RequestBuilderContainer container;
+    private RequestBuilderContainer container;
 
 
     public EsEntityIndexBatchImpl(final ApplicationScope applicationScope, final Client client,final IndexBatchBuffer indexBatchBuffer,
-            final IndexFig config, final FailureMonitor failureMonitor, final AliasedEntityIndex entityIndex ) {
+            final IndexFig config, final AliasedEntityIndex entityIndex ) {
 
         this.applicationScope = applicationScope;
         this.client = client;
         this.indexBatchBuffer = indexBatchBuffer;
-        this.failureMonitor = failureMonitor;
         this.entityIndex = entityIndex;
         this.indexIdentifier = IndexingUtils.createIndexIdentifier(config, applicationScope);
         this.alias = indexIdentifier.getAlias();
@@ -214,15 +212,20 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
     }
 
     @Override
-    public void execute() {
-        indexBatchBuffer.put(container);
+    public BetterFuture execute() {
+        RequestBuilderContainer tempContainer = container;
+        container = new RequestBuilderContainer();
+        BetterFuture future = indexBatchBuffer.put(tempContainer);
+        return future;
     }
 
     @Override
     public void executeAndRefresh() {
-        BetterFuture future = indexBatchBuffer.put(container);
+        container.setForceRefresh(true);
+        RequestBuilderContainer tempContainer = container;
+        container = new RequestBuilderContainer();
+        BetterFuture future = indexBatchBuffer.put(tempContainer);
         future.get();
-        entityIndex.refresh();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ac55e087/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index 4e5687f..2881ce4 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@ -281,7 +281,7 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
     @Override
     public EntityIndexBatch createBatch() {
         return new EsEntityIndexBatchImpl(
-                applicationScope, esProvider.getClient(),indexBatchBuffer, config, failureMonitor, this );
+                applicationScope, esProvider.getClient(),indexBatchBuffer, config, this );
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ac55e087/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
index 364c20a..b5d9528 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
@@ -40,12 +40,15 @@ import rx.Subscriber;
 import rx.functions.Action0;
 import rx.functions.Action1;
 import rx.functions.Func1;
+import rx.schedulers.Schedulers;
 
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 
 
 /**
@@ -55,24 +58,20 @@ import java.util.concurrent.*;
 public class IndexBatchBufferImpl implements IndexBatchBuffer {
 
     private static final Logger log = LoggerFactory.getLogger(IndexBatchBufferImpl.class);
-    private final MetricsFactory metricsFactory;
     private final Counter indexSizeCounter;
     private final Client client;
     private final FailureMonitorImpl failureMonitor;
     private final IndexFig config;
     private final Timer flushTimer;
-    private final ArrayBlockingQueue<RequestBuilderContainer> blockingQueue;
     private final Counter bufferCounter;
     private Observable<List<RequestBuilderContainer>> consumer;
     private Producer producer;
 
     @Inject
     public IndexBatchBufferImpl(final IndexFig config, final EsProvider provider, final MetricsFactory metricsFactory){
-        this.metricsFactory = metricsFactory;
         this.flushTimer = metricsFactory.getTimer(IndexBatchBuffer.class, "index.buffer.flush");
         this.indexSizeCounter =  metricsFactory.getCounter(IndexBatchBuffer.class, "index.buffer.size");
         this.config = config;
-        this.blockingQueue = new ArrayBlockingQueue<>(config.getIndexBatchSize());
         this.failureMonitor = new FailureMonitorImpl(config,provider);
         this.producer = new Producer();
         this.client = provider.getClient();
@@ -81,18 +80,25 @@ public class IndexBatchBufferImpl implements IndexBatchBuffer {
     }
 
     private void consumer() {
+        final AtomicLong queueSize = new AtomicLong();
         //batch up sets of some size and send them in batch
         this.consumer = Observable.create(producer)
+                .subscribeOn(Schedulers.io())
+                .doOnNext(new Action1<RequestBuilderContainer>() {
+                    @Override
+                    public void call(RequestBuilderContainer requestBuilderContainer) {
+                        queueSize.addAndGet(requestBuilderContainer.getBuilder().size());
+                    }
+                })
                 .buffer(config.getIndexBufferTimeout(), TimeUnit.MILLISECONDS, config.getIndexBufferSize())
                 .doOnNext(new Action1<List<RequestBuilderContainer>>() {
                     @Override
                     public void call(List<RequestBuilderContainer> containerList) {
-                        for (RequestBuilderContainer container : containerList) {
-                            blockingQueue.add(container);
-                        }
                         flushTimer.time();
                         indexSizeCounter.dec(containerList.size());
-                        execute(config.isForcedRefresh());
+                        if(containerList.size()>0){
+                            execute(containerList);
+                        }
                     }
                 });
         consumer.subscribe();
@@ -109,20 +115,22 @@ public class IndexBatchBufferImpl implements IndexBatchBuffer {
     /**
      * Execute the request, check for errors, then re-init the batch for future use
      */
-    private void execute(final boolean refresh) {
+    private void execute(final List<RequestBuilderContainer> containers) {
 
-        if (blockingQueue.size() == 0) {
+        if (containers == null || containers.size() == 0) {
             return;
         }
 
-
-        Collection<RequestBuilderContainer> containerCollection = new ArrayList<>(config.getIndexBatchSize());
-        blockingQueue.drainTo(containerCollection);
+        final AtomicBoolean isForceRefresh = new AtomicBoolean(config.isForcedRefresh());
         //clear the queue or proceed to buffersize
-        Observable.from(containerCollection)
+        Observable.from(containers)
+                .subscribeOn(Schedulers.io())
                 .flatMap(new Func1<RequestBuilderContainer, Observable<ShardReplicationOperationRequestBuilder>>() {
                     @Override
                     public Observable<ShardReplicationOperationRequestBuilder> call(RequestBuilderContainer requestBuilderContainer) {
+                        if (requestBuilderContainer.isForceRefresh()){
+                            isForceRefresh.set(true);
+                        }
                         return Observable.from(requestBuilderContainer.getBuilder())
                                 .map(new Func1<ShardReplicationOperationRequestBuilder, ShardReplicationOperationRequestBuilder>() {
                                     @Override
@@ -136,20 +144,20 @@ public class IndexBatchBufferImpl implements IndexBatchBuffer {
                 .doOnNext(new Action1<List<ShardReplicationOperationRequestBuilder>>() {
                     @Override
                     public void call(List<ShardReplicationOperationRequestBuilder> builders) {
-                        final BulkRequestBuilder bulkRequest = initRequest(refresh);
-                        for(ShardReplicationOperationRequestBuilder builder : builders) {
+                        final BulkRequestBuilder bulkRequest = initRequest(isForceRefresh.get());
+                        for (ShardReplicationOperationRequestBuilder builder : builders) {
                             if (builder instanceof IndexRequestBuilder) {
                                 bulkRequest.add((IndexRequestBuilder) builder);
                             }
                             if (builder instanceof DeleteRequestBuilder) {
                                 bulkRequest.add((DeleteRequestBuilder) builder);
                             }
-                            sendRequest(bulkRequest);
                         }
+                        sendRequest(bulkRequest);
                     }
                 }).toBlocking().lastOrDefault(null);
 
-        for (RequestBuilderContainer container : containerCollection) {
+        for (RequestBuilderContainer container : containers) {
             container.done();
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ac55e087/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
index 6cda9f2..e3eeaf4 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
@@ -467,18 +467,21 @@ public class EntityIndexTest extends BaseIT {
 
         EntityIndexBatch batch = ei.createBatch();
 
-        batch.index( appScope, user ).executeAndRefresh();
+        batch.index( appScope, user );
+        batch.executeAndRefresh();
+
         Query query = new Query();
         query.addEqualityFilter( "username", "edanuff" );
         CandidateResults r = ei.search( appScope, SearchTypes.fromTypes( "edanuff" ), query );
         assertEquals( user.getId(), r.get( 0 ).getId() );
 
-        batch.deindex(appScope, user.getId(), user.getVersion() ).executeAndRefresh();
-
+        batch.deindex(appScope, user.getId(), user.getVersion() );
+        batch.executeAndRefresh();
 
         // EntityRef
         query = new Query();
         query.addEqualityFilter( "username", "edanuff" );
+
         r = ei.search(appScope,SearchTypes.fromTypes( "edanuff" ),  query );
 
         assertFalse( r.iterator().hasNext() );