You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/02/21 14:36:50 UTC

[1/2] incubator-usergrid git commit: moving towards futures

Repository: incubator-usergrid
Updated Branches:
  refs/heads/USERGRID-273-indexbuffer 8659771b1 -> 88435c852


moving towards futures


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/2cd7cfe4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/2cd7cfe4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/2cd7cfe4

Branch: refs/heads/USERGRID-273-indexbuffer
Commit: 2cd7cfe46e826752883a53008bda013ab6396937
Parents: 8659771
Author: Shawn Feldman <sf...@apache.org>
Authored: Thu Feb 19 12:25:29 2015 -0800
Committer: Shawn Feldman <sf...@apache.org>
Committed: Thu Feb 19 12:25:29 2015 -0800

----------------------------------------------------------------------
 .../persistence/index/IndexBatchBuffer.java     | 25 ++++++++++++++++++--
 .../index/impl/EsEntityIndexBatchImpl.java      | 25 ++++++++++++++++----
 .../index/impl/IndexBatchBufferImpl.java        | 23 ++++++++++--------
 3 files changed, 57 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2cd7cfe4/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBatchBuffer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBatchBuffer.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBatchBuffer.java
index 6314bf2..5a3289b 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBatchBuffer.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBatchBuffer.java
@@ -16,11 +16,14 @@
  */
 package org.apache.usergrid.persistence.index;
 
+import org.apache.usergrid.persistence.index.impl.IndexBatchBufferImpl;
 import org.elasticsearch.action.delete.DeleteRequestBuilder;
 import org.elasticsearch.action.index.IndexRequestBuilder;
 import rx.Observable;
 
+import java.util.concurrent.Callable;
 import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
 
 /**
  * Buffer for index operations,
@@ -42,13 +45,31 @@ public interface IndexBatchBuffer {
      *
      * @param builder
      */
-    public Observable put(IndexRequestBuilder builder);
+    public IndexBatchBuffer.BetterFuture put(IndexRequestBuilder builder);
 
     /**
      * put request into buffer
      *
      * @param builder
      */
-    public Observable put(DeleteRequestBuilder builder);
+    public IndexBatchBuffer.BetterFuture put(DeleteRequestBuilder builder);
 
+    public static class BetterFuture<T>{
+        FutureTask<T> future;
+        public BetterFuture(Callable<T> callable){
+            future = new FutureTask<>(callable);
+        }
+
+        public void done(){
+            future.run();
+        }
+
+        public T get(){
+            try {
+                return future.get();
+            }catch (Exception e){
+                throw new RuntimeException(e);
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2cd7cfe4/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
index 1bc21d3..f994262 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
@@ -27,6 +27,7 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import com.google.common.util.concurrent.Futures;
 import org.apache.usergrid.persistence.index.*;
 import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkRequestBuilder;
@@ -93,6 +94,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
     private final FailureMonitor failureMonitor;
 
     private final AliasedEntityIndex entityIndex;
+    private final ArrayList<IndexBatchBuffer.BetterFuture> promises;
 
 
     public EsEntityIndexBatchImpl(final ApplicationScope applicationScope, final Client client,final IndexBatchBuffer indexBatchBuffer,
@@ -106,6 +108,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
         this.indexIdentifier = IndexingUtils.createIndexIdentifier(config, applicationScope);
         this.alias = indexIdentifier.getAlias();
         this.refresh = config.isForcedRefresh();
+        this.promises = new ArrayList<IndexBatchBuffer.BetterFuture>();
     }
 
 
@@ -141,7 +144,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
         final String entityType = entity.getId().getType();
         IndexRequestBuilder builder =
                 client.prepareIndex(alias.getWriteAlias(), entityType, indexId).setSource( entityAsMap );
-        indexBatchBuffer.put(builder);
+        promises.add(indexBatchBuffer.put(builder));
         return this;
     }
 
@@ -187,7 +190,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
                    public Object call(String index) {
                        try {
                            DeleteRequestBuilder builder = client.prepareDelete(index, entityType, indexId).setRefresh(refresh);
-                           indexBatchBuffer.put(builder);
+                           promises.add(indexBatchBuffer.put(builder));
                        }catch (Exception e){
                            log.error("failed to deindex",e);
                            throw e;
@@ -220,14 +223,28 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
 
     @Override
     public void execute() {
-        indexBatchBuffer.flush();
+//        indexBatchBuffer.flush();
+        Observable.from(promises)
+                .doOnNext(new Action1<IndexBatchBuffer.BetterFuture>() {
+                    @Override
+                    public void call(IndexBatchBuffer.BetterFuture betterFuture) {
+                        betterFuture.get();
+                    }
+                }).toBlocking().lastOrDefault(null);
     }
 
 
 
     @Override
     public void executeAndRefresh() {
-        indexBatchBuffer.flushAndRefresh();
+//        indexBatchBuffer.flushAndRefresh();
+        Observable.from(promises)
+                .doOnNext(new Action1<IndexBatchBuffer.BetterFuture>() {
+                    @Override
+                    public void call(IndexBatchBuffer.BetterFuture betterFuture) {
+                        betterFuture.get();
+                    }
+                }).toBlocking().lastOrDefault(null);
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2cd7cfe4/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
index aa4ea17..37b3369 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
@@ -35,9 +35,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import rx.Observable;
 import rx.Subscriber;
-import rx.Subscription;
 import rx.functions.Action1;
-import sun.jvm.hotspot.opto.Block;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -64,14 +62,14 @@ public class IndexBatchBufferImpl implements IndexBatchBuffer {
     }
 
 
-    public Observable put(IndexRequestBuilder builder){
+    public BetterFuture put(IndexRequestBuilder builder){
         RequestBuilderContainer container = new RequestBuilderContainer(builder);
         metricsFactory.getCounter(IndexBatchBuffer.class,"index.buffer.size").inc();
         producer.put(container);
         return container.getFuture();
     }
 
-    public Observable put(DeleteRequestBuilder builder){
+    public BetterFuture put(DeleteRequestBuilder builder){
         RequestBuilderContainer container = new RequestBuilderContainer(builder);
         metricsFactory.getCounter(IndexBatchBuffer.class,"index.buffer.size").inc();
         producer.put(new RequestBuilderContainer(builder));
@@ -214,26 +212,31 @@ public class IndexBatchBufferImpl implements IndexBatchBuffer {
 
     private static class RequestBuilderContainer{
         private final ShardReplicationOperationRequestBuilder builder;
-        private final Observable<RequestBuilderContainer> containerFuture;
+        private final BetterFuture<RequestBuilderContainer> containerFuture;
 
         public RequestBuilderContainer(ShardReplicationOperationRequestBuilder builder){
             final RequestBuilderContainer parent = this;
             this.builder = builder;
             this.containerFuture
-                    = Observable.create(new Observable.OnSubscribe<RequestBuilderContainer>(){
-                
-            })
+                    = new BetterFuture<>(new Callable<RequestBuilderContainer>() {
+                @Override
+                public RequestBuilderContainer call() throws Exception {
+                    return parent;
+                }
+            });
         }
 
         public ShardReplicationOperationRequestBuilder getBuilder(){
             return builder;
         }
         public void done(){
-            containerFuture.();
+            containerFuture.done();
         }
-        public Observable<RequestBuilderContainer> getFuture(){
+        public BetterFuture<RequestBuilderContainer> getFuture(){
             return containerFuture;
         }
     }
 
+
+
 }


[2/2] incubator-usergrid git commit: Adding Batch Buffer

Posted by sf...@apache.org.
Adding Batch Buffer


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/88435c85
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/88435c85
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/88435c85

Branch: refs/heads/USERGRID-273-indexbuffer
Commit: 88435c8525810dc46e204d6d2254d8e135291f4b
Parents: 2cd7cfe
Author: Shawn Feldman <sf...@apache.org>
Authored: Thu Feb 19 17:51:06 2015 -0800
Committer: Shawn Feldman <sf...@apache.org>
Committed: Thu Feb 19 17:51:06 2015 -0800

----------------------------------------------------------------------
 .../persistence/core/future/BetterFuture.java   |  42 +++
 .../core/future/ObservableFuture.java           |  42 +++
 .../persistence/index/IndexBatchBuffer.java     |  36 +--
 .../index/impl/EsEntityIndexBatchImpl.java      |  35 ++-
 .../index/impl/IndexBatchBufferImpl.java        | 260 +++++++++----------
 5 files changed, 226 insertions(+), 189 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/88435c85/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/BetterFuture.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/BetterFuture.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/BetterFuture.java
new file mode 100644
index 0000000..6146fe8
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/BetterFuture.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.persistence.core.future;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.FutureTask;
+
+/**
+ * Future without the exception nastiness
+ */
+public  class BetterFuture<T>{
+    FutureTask<T> future;
+    public BetterFuture(Callable<T> callable){
+        future = new FutureTask<>(callable);
+    }
+
+    public void done(){
+        future.run();
+    }
+
+    public T get(){
+        try {
+            return future.get();
+        }catch (Exception e){
+            throw new RuntimeException(e);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/88435c85/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/ObservableFuture.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/ObservableFuture.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/ObservableFuture.java
new file mode 100644
index 0000000..d08dd92
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/ObservableFuture.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.persistence.core.future;
+
+import rx.Observable;
+import rx.Subscriber;
+
+/**
+ *
+ */
+public class ObservableFuture<T> implements Observable.OnSubscribe<T> {
+
+    private Subscriber<? super T> subscriber;
+
+    @Override
+    public void call(Subscriber<? super T> subscriber) {
+        this.subscriber = subscriber;
+    }
+
+    public void done(T t){
+        this.subscriber.onCompleted();
+    }
+
+    public void emit(T t){
+        this.subscriber.onNext(t);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/88435c85/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBatchBuffer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBatchBuffer.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBatchBuffer.java
index 5a3289b..b24d37b 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBatchBuffer.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBatchBuffer.java
@@ -16,6 +16,7 @@
  */
 package org.apache.usergrid.persistence.index;
 
+import org.apache.usergrid.persistence.core.future.BetterFuture;
 import org.apache.usergrid.persistence.index.impl.IndexBatchBufferImpl;
 import org.elasticsearch.action.delete.DeleteRequestBuilder;
 import org.elasticsearch.action.index.IndexRequestBuilder;
@@ -26,50 +27,23 @@ import java.util.concurrent.Future;
 import java.util.concurrent.FutureTask;
 
 /**
- * Buffer for index operations,
+ * * Buffer index requests into sets to send,
  */
 public interface IndexBatchBuffer {
 
     /**
-     * flush out buffer and execute
-     */
-    public void flush();
-
-    /**
-     * flush out buffer and execute
-     */
-    public void flushAndRefresh();
-
-    /**
-     * put request into buffer
+     * put request into buffer, retu
      *
      * @param builder
      */
-    public IndexBatchBuffer.BetterFuture put(IndexRequestBuilder builder);
+    public BetterFuture put(IndexRequestBuilder builder);
 
     /**
      * put request into buffer
      *
      * @param builder
      */
-    public IndexBatchBuffer.BetterFuture put(DeleteRequestBuilder builder);
-
-    public static class BetterFuture<T>{
-        FutureTask<T> future;
-        public BetterFuture(Callable<T> callable){
-            future = new FutureTask<>(callable);
-        }
+    public BetterFuture put(DeleteRequestBuilder builder);
 
-        public void done(){
-            future.run();
-        }
 
-        public T get(){
-            try {
-                return future.get();
-            }catch (Exception e){
-                throw new RuntimeException(e);
-            }
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/88435c85/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
index f994262..896c038 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
@@ -18,16 +18,12 @@
  */
 package org.apache.usergrid.persistence.index.impl;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
+import java.util.*;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.util.concurrent.Futures;
+import org.apache.usergrid.persistence.core.future.BetterFuture;
 import org.apache.usergrid.persistence.index.*;
 import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkRequestBuilder;
@@ -94,7 +90,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
     private final FailureMonitor failureMonitor;
 
     private final AliasedEntityIndex entityIndex;
-    private final ArrayList<IndexBatchBuffer.BetterFuture> promises;
+    private final ConcurrentLinkedQueue<BetterFuture> promises;
 
 
     public EsEntityIndexBatchImpl(final ApplicationScope applicationScope, final Client client,final IndexBatchBuffer indexBatchBuffer,
@@ -108,14 +104,12 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
         this.indexIdentifier = IndexingUtils.createIndexIdentifier(config, applicationScope);
         this.alias = indexIdentifier.getAlias();
         this.refresh = config.isForcedRefresh();
-        this.promises = new ArrayList<IndexBatchBuffer.BetterFuture>();
+        this.promises = new ConcurrentLinkedQueue<>();
     }
 
 
     @Override
     public EntityIndexBatch index( final IndexScope indexScope, final Entity entity ) {
-
-
         IndexValidationUtils.validateIndexScope( indexScope );
         ValidationUtils.verifyEntityWrite( entity );
 
@@ -225,12 +219,13 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
     public void execute() {
 //        indexBatchBuffer.flush();
         Observable.from(promises)
-                .doOnNext(new Action1<IndexBatchBuffer.BetterFuture>() {
+                .doOnNext(new Action1<BetterFuture>() {
                     @Override
-                    public void call(IndexBatchBuffer.BetterFuture betterFuture) {
+                    public void call(BetterFuture betterFuture) {
                         betterFuture.get();
                     }
                 }).toBlocking().lastOrDefault(null);
+        promises.clear();
     }
 
 
@@ -238,13 +233,13 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
     @Override
     public void executeAndRefresh() {
 //        indexBatchBuffer.flushAndRefresh();
-        Observable.from(promises)
-                .doOnNext(new Action1<IndexBatchBuffer.BetterFuture>() {
-                    @Override
-                    public void call(IndexBatchBuffer.BetterFuture betterFuture) {
-                        betterFuture.get();
-                    }
-                }).toBlocking().lastOrDefault(null);
+        Iterator<BetterFuture> iterator = promises.iterator();
+        while(iterator.hasNext()){
+            iterator.next().get();
+        }
+        promises.clear();
+        entityIndex.refresh();
+
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/88435c85/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
index 37b3369..d7e6bf1 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
@@ -20,6 +20,7 @@ import com.codahale.metrics.Counter;
 import com.codahale.metrics.Timer;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
+import org.apache.usergrid.persistence.core.future.BetterFuture;
 import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.index.IndexBatchBuffer;
 import org.apache.usergrid.persistence.index.IndexFig;
@@ -35,6 +36,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import rx.Observable;
 import rx.Subscriber;
+import rx.functions.Action0;
 import rx.functions.Action1;
 
 import java.util.ArrayList;
@@ -44,199 +46,181 @@ import java.util.concurrent.*;
 
 
 /**
- * Classy class class.
+ * Buffer index requests into sets to send.
  */
 @Singleton
 public class IndexBatchBufferImpl implements IndexBatchBuffer {
 
     private static final Logger log = LoggerFactory.getLogger(IndexBatchBufferImpl.class);
     private final MetricsFactory metricsFactory;
+    private final Counter indexSizeCounter;
+    private final Client client;
+    private final FailureMonitorImpl failureMonitor;
+    private final IndexFig config;
+    private final Timer flushTimer;
+    private final ArrayBlockingQueue<RequestBuilderContainer> blockingQueue;
+    private Observable<List<RequestBuilderContainer>> consumer;
     private Producer producer;
-    private Consumer consumer;
 
     @Inject
-    public IndexBatchBufferImpl(final IndexFig config, final EsProvider provider, MetricsFactory metricsFactory){
+    public IndexBatchBufferImpl(final IndexFig config, final EsProvider provider, final MetricsFactory metricsFactory){
         this.metricsFactory = metricsFactory;
+        this.flushTimer = metricsFactory.getTimer(IndexBatchBuffer.class, "index.buffer.flush");
+        this.indexSizeCounter =  metricsFactory.getCounter(IndexBatchBuffer.class, "index.buffer.size");
+        this.config = config;
+        this.blockingQueue = new ArrayBlockingQueue<>(config.getIndexBatchSize());
+        this.failureMonitor = new FailureMonitorImpl(config,provider);
         this.producer = new Producer();
-        this.consumer = new Consumer(config,producer,metricsFactory, provider);
+        this.client = provider.getClient();
+
+        consumer();
     }
 
+    private void consumer() {
+        //batch up sets of some size and send them in batch
+        this.consumer = Observable.create(producer)
+                .buffer(config.getIndexBufferTimeout(), TimeUnit.MILLISECONDS, config.getIndexBufferSize())
+                .doOnNext(new Action1<List<RequestBuilderContainer>>() {
+                    @Override
+                    public void call(List<RequestBuilderContainer> containerList) {
+                        for (RequestBuilderContainer container : containerList) {
+                            blockingQueue.add(container);
+                        }
+                        flushTimer.time();
+                        indexSizeCounter.dec(containerList.size());
+                        execute(config.isForcedRefresh());
+                    }
+                });
+        consumer.subscribe();
+    }
 
-    public BetterFuture put(IndexRequestBuilder builder){
+    @Override
+    public BetterFuture<ShardReplicationOperationRequestBuilder> put(IndexRequestBuilder builder){
         RequestBuilderContainer container = new RequestBuilderContainer(builder);
         metricsFactory.getCounter(IndexBatchBuffer.class,"index.buffer.size").inc();
         producer.put(container);
         return container.getFuture();
     }
 
-    public BetterFuture put(DeleteRequestBuilder builder){
+    @Override
+    public BetterFuture<ShardReplicationOperationRequestBuilder> put(DeleteRequestBuilder builder){
         RequestBuilderContainer container = new RequestBuilderContainer(builder);
         metricsFactory.getCounter(IndexBatchBuffer.class,"index.buffer.size").inc();
-        producer.put(new RequestBuilderContainer(builder));
+        producer.put(container);
         return container.getFuture();
     }
 
-    public void flushAndRefresh(){
-       try {
-           Thread.sleep(500);
-       }catch (Exception e){
-
-       }
-    }
-    public void flush(){
-        try {
-            Thread.sleep(500);
-        }catch (Exception e){
-
-        }
-    }
 
-    private static class Producer implements Observable.OnSubscribe<RequestBuilderContainer> {
 
-        private Subscriber<? super RequestBuilderContainer> subscriber;
+    private static class RequestBuilderContainer{
+        private final ShardReplicationOperationRequestBuilder builder;
+        private final BetterFuture<ShardReplicationOperationRequestBuilder> containerFuture;
 
-        @Override
-        public void call(Subscriber<? super RequestBuilderContainer> subscriber) {
-            this.subscriber = subscriber;
+        public RequestBuilderContainer(ShardReplicationOperationRequestBuilder builder){
+            final RequestBuilderContainer parent = this;
+            this.builder = builder;
+            this.containerFuture = new BetterFuture<>(new Callable<ShardReplicationOperationRequestBuilder>() {
+                @Override
+                public ShardReplicationOperationRequestBuilder call() throws Exception {
+                    return parent.getBuilder();
+                }
+            });
         }
 
-        public void put(RequestBuilderContainer r){
-            subscriber.onNext(r);
+        public ShardReplicationOperationRequestBuilder getBuilder(){
+            return builder;
+        }
+        private void done(){
+            containerFuture.done();
+        }
+        public BetterFuture<ShardReplicationOperationRequestBuilder> getFuture(){
+            return containerFuture;
         }
     }
+    /**
+     * Execute the request, check for errors, then re-init the batch for future use
+     */
+    private void execute( boolean refresh) {
 
-    public static class Consumer {
-        private final Observable<List<RequestBuilderContainer>> consumer;
-        private final Timer flushTimer;
-        private final Counter indexSizeCounter;
-        private final BlockingQueue<RequestBuilderContainer> blockingQueue;
-        private final Client client;
-        private final IndexFig config;
-        private final FailureMonitorImpl failureMonitor;
-
-        public Consumer(final IndexFig config, Producer producer,MetricsFactory metricsFactory, final EsProvider provider){
-            this.config = config;
-            this.client = provider.getClient();
-            this.failureMonitor = new FailureMonitorImpl(config,provider);
-            this.blockingQueue = new ArrayBlockingQueue<>(config.getIndexBatchSize());
-            this.flushTimer = metricsFactory.getTimer(IndexBatchBuffer.class, "index.buffer.flush");
-            this.indexSizeCounter =  metricsFactory.getCounter(IndexBatchBuffer.class, "index.buffer.size");
-            this.consumer = Observable.create(producer)
-                    .buffer(config.getIndexBufferTimeout(), TimeUnit.MILLISECONDS, config.getIndexBufferSize())
-                    .doOnNext(new Action1<List<RequestBuilderContainer>>() {
-                        @Override
-                        public void call(List<RequestBuilderContainer> containerList) {
-                            for (RequestBuilderContainer container : containerList) {
-                                blockingQueue.add(container);
-                            }
-                            flushTimer.time();
-                            indexSizeCounter.dec(containerList.size());
-                            execute(config.isForcedRefresh());
-                        }
-                    });
-            consumer.subscribe();
+        if (blockingQueue.size() == 0) {
+            return;
         }
 
-        /**
-         * Execute the request, check for errors, then re-init the batch for future use
-         */
-        private void execute(boolean refresh) {
-
-            if (blockingQueue.size() == 0) {
-                return;
-            }
-
-            BulkRequestBuilder bulkRequest = initRequest(refresh);
+        BulkRequestBuilder bulkRequest = initRequest(refresh);
 
-            Collection<RequestBuilderContainer> containerCollection = new ArrayList<>(config.getIndexBatchSize());
-            blockingQueue.drainTo(containerCollection);
-            int count = 0;
-            //clear the queue or proceed to buffersize
-            for (RequestBuilderContainer container : containerCollection) {
+        Collection<RequestBuilderContainer> containerCollection = new ArrayList<>(config.getIndexBatchSize());
+        blockingQueue.drainTo(containerCollection);
+        int count = 0;
+        //clear the queue or proceed to buffersize
+        for (RequestBuilderContainer container : containerCollection) {
 
-                ShardReplicationOperationRequestBuilder builder = container.getBuilder();
-                //only handle two types of requests for now, annoyingly there is no base class implementation on BulkRequest
-                if (builder instanceof IndexRequestBuilder) {
-                    bulkRequest.add((IndexRequestBuilder) builder);
-                }
-                if (builder instanceof DeleteRequestBuilder) {
-                    bulkRequest.add((DeleteRequestBuilder) builder);
-                }
+            ShardReplicationOperationRequestBuilder builder = container.getBuilder();
+            //only handle two types of requests for now, annoyingly there is no base class implementation on BulkRequest
+            if (builder instanceof IndexRequestBuilder) {
+                bulkRequest.add((IndexRequestBuilder) builder);
+            }
+            if (builder instanceof DeleteRequestBuilder) {
+                bulkRequest.add((DeleteRequestBuilder) builder);
+            }
 
-                if (count++ == config.getIndexBatchSize()) {
-                    sendRequest(bulkRequest);
-                    bulkRequest = initRequest(refresh);
+            if (count++ == config.getIndexBatchSize()) {
+                sendRequest(bulkRequest);
+                bulkRequest = initRequest(refresh);
 
-                }
-            }
-            sendRequest(bulkRequest);
-            for (RequestBuilderContainer container : containerCollection) {
-                container.done();
             }
         }
-
-        private BulkRequestBuilder initRequest(boolean refresh) {
-            BulkRequestBuilder bulkRequest = client.prepareBulk();
-            bulkRequest.setConsistencyLevel(WriteConsistencyLevel.fromString(config.getWriteConsistencyLevel()));
-            bulkRequest.setRefresh(refresh);
-            return bulkRequest;
+        sendRequest(bulkRequest);
+        for (RequestBuilderContainer container : containerCollection) {
+            container.done();
         }
+    }
 
-        private void sendRequest(BulkRequestBuilder bulkRequest) {
-            //nothing to do, we haven't added anthing to the index
-            if (bulkRequest.numberOfActions() == 0) {
-                return;
-            }
+    private BulkRequestBuilder initRequest(boolean refresh) {
+        BulkRequestBuilder bulkRequest = client.prepareBulk();
+        bulkRequest.setConsistencyLevel(WriteConsistencyLevel.fromString(config.getWriteConsistencyLevel()));
+        bulkRequest.setRefresh(refresh);
+        return bulkRequest;
+    }
 
-            final BulkResponse responses;
+    private void sendRequest(BulkRequestBuilder bulkRequest) {
+        //nothing to do, we haven't added anthing to the index
+        if (bulkRequest.numberOfActions() == 0) {
+            return;
+        }
 
-            try {
-                responses = bulkRequest.execute().actionGet();
-            } catch (Throwable t) {
-                log.error("Unable to communicate with elasticsearch");
-                failureMonitor.fail("Unable to execute batch", t);
-                throw t;
-            }
+        final BulkResponse responses;
 
-            failureMonitor.success();
+        try {
+            responses = bulkRequest.execute().actionGet();
+        } catch (Throwable t) {
+            log.error("Unable to communicate with elasticsearch");
+            failureMonitor.fail("Unable to execute batch", t);
+            throw t;
+        }
 
-            for (BulkItemResponse response : responses) {
-                if (response.isFailed()) {
-                    throw new RuntimeException("Unable to index documents.  Errors are :"
-                            + response.getFailure().getMessage());
-                }
+        failureMonitor.success();
+
+        for (BulkItemResponse response : responses) {
+            if (response.isFailed()) {
+                throw new RuntimeException("Unable to index documents.  Errors are :"
+                        + response.getFailure().getMessage());
             }
         }
-
     }
 
-    private static class RequestBuilderContainer{
-        private final ShardReplicationOperationRequestBuilder builder;
-        private final BetterFuture<RequestBuilderContainer> containerFuture;
 
-        public RequestBuilderContainer(ShardReplicationOperationRequestBuilder builder){
-            final RequestBuilderContainer parent = this;
-            this.builder = builder;
-            this.containerFuture
-                    = new BetterFuture<>(new Callable<RequestBuilderContainer>() {
-                @Override
-                public RequestBuilderContainer call() throws Exception {
-                    return parent;
-                }
-            });
-        }
+    private static class Producer implements Observable.OnSubscribe<RequestBuilderContainer> {
 
-        public ShardReplicationOperationRequestBuilder getBuilder(){
-            return builder;
-        }
-        public void done(){
-            containerFuture.done();
+        private Subscriber<? super RequestBuilderContainer> subscriber;
+
+        @Override
+        public void call(Subscriber<? super RequestBuilderContainer> subscriber) {
+            this.subscriber = subscriber;
         }
-        public BetterFuture<RequestBuilderContainer> getFuture(){
-            return containerFuture;
+
+        public void put(RequestBuilderContainer r){
+            subscriber.onNext(r);
         }
     }
 
-
-
 }