You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by gr...@apache.org on 2015/03/03 21:01:06 UTC

[27/37] incubator-usergrid git commit: index message comments

index message comments

separate into classes

switch to producer consumer model

removing static class


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/489b2d78
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/489b2d78
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/489b2d78

Branch: refs/heads/USERGRID-422
Commit: 489b2d78da3167f63d447d5eb8b5048e744b51aa
Parents: 0621294
Author: Shawn Feldman <sf...@apache.org>
Authored: Wed Feb 25 18:42:29 2015 -0700
Committer: Shawn Feldman <sf...@apache.org>
Committed: Thu Feb 26 09:42:24 2015 -0700

----------------------------------------------------------------------
 .../persistence/index/IndexBufferConsumer.java  |  26 +++
 .../persistence/index/IndexBufferProducer.java  |  36 ++++
 .../index/IndexOperationMessage.java            |   5 +-
 .../persistence/index/guice/IndexModule.java    |  11 +-
 .../index/impl/EsEntityIndexBatchImpl.java      |   8 +-
 .../index/impl/EsEntityIndexImpl.java           |  10 +-
 .../index/impl/EsIndexBufferConsumerImpl.java   | 193 +++++++++++++++++
 .../index/impl/EsIndexBufferProducerImpl.java   |  55 +++++
 .../index/impl/IndexBatchBufferImpl.java        | 206 -------------------
 9 files changed, 325 insertions(+), 225 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/489b2d78/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBufferConsumer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBufferConsumer.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBufferConsumer.java
new file mode 100644
index 0000000..ac7489c
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBufferConsumer.java
@@ -0,0 +1,26 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  *  contributor license agreements.  The ASF licenses this file to You
+ *  * under the Apache License, Version 2.0 (the "License"); you may not
+ *  * use this file except in compliance with the License.
+ *  * You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.  For additional information regarding
+ *  * copyright in this work, please see the NOTICE file in the top level
+ *  * directory of this distribution.
+ *
+ */
+package org.apache.usergrid.persistence.index;
+
+/**
+ * Classy class class.
+ */
+public interface IndexBufferConsumer {
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/489b2d78/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBufferProducer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBufferProducer.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBufferProducer.java
new file mode 100644
index 0000000..6338a0c
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBufferProducer.java
@@ -0,0 +1,36 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  *  contributor license agreements.  The ASF licenses this file to You
+ *  * under the Apache License, Version 2.0 (the "License"); you may not
+ *  * use this file except in compliance with the License.
+ *  * You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.  For additional information regarding
+ *  * copyright in this work, please see the NOTICE file in the top level
+ *  * directory of this distribution.
+ *
+ */
+package org.apache.usergrid.persistence.index;
+
+import org.apache.usergrid.persistence.core.future.BetterFuture;
+import org.apache.usergrid.persistence.index.IndexOperationMessage;
+import rx.Observable;
+import rx.Subscriber;
+
+/**
+ * Classy class class.
+ */
+public interface IndexBufferProducer extends Observable.OnSubscribe<IndexOperationMessage> {
+
+    @Override
+    void call(Subscriber<? super IndexOperationMessage> subscriber);
+
+    BetterFuture put(IndexOperationMessage message);
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/489b2d78/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java
index 5acb17e..3a0a702 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java
@@ -24,7 +24,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
 /**
- * Classy class class.
+ * Container for index operations.
  */
 public  class IndexOperationMessage {
     private final ConcurrentLinkedQueue<ShardReplicationOperationRequestBuilder> builders;
@@ -47,9 +47,6 @@ public  class IndexOperationMessage {
     public ConcurrentLinkedQueue<ShardReplicationOperationRequestBuilder> getBuilder(){
         return builders;
     }
-    public void done(){
-        containerFuture.done();
-    }
     public BetterFuture<IndexOperationMessage> getFuture(){
         return containerFuture;
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/489b2d78/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
index 5af148a..d9a14c9 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
@@ -19,14 +19,12 @@
 
 package org.apache.usergrid.persistence.index.guice;
 
-import org.apache.usergrid.persistence.index.IndexBatchBuffer;
-import org.apache.usergrid.persistence.index.IndexFig;
+import org.apache.usergrid.persistence.index.*;
 import com.google.inject.AbstractModule;
 import com.google.inject.assistedinject.FactoryModuleBuilder;
-import org.apache.usergrid.persistence.index.EntityIndex;
-import org.apache.usergrid.persistence.index.EntityIndexFactory;
 import org.apache.usergrid.persistence.index.impl.EsEntityIndexImpl;
-import org.apache.usergrid.persistence.index.impl.IndexBatchBufferImpl;
+import org.apache.usergrid.persistence.index.impl.EsIndexBufferConsumerImpl;
+import org.apache.usergrid.persistence.index.impl.EsIndexBufferProducerImpl;
 import org.safehaus.guicyfig.GuicyFigModule;
 
 
@@ -42,7 +40,8 @@ public class IndexModule extends AbstractModule {
                 .implement(EntityIndex.class, EsEntityIndexImpl.class)
                 .build(EntityIndexFactory.class));
 
-        bind(IndexBatchBuffer.class).to(IndexBatchBufferImpl.class);
+        bind(IndexBufferProducer.class).to(EsIndexBufferProducerImpl.class);
+        bind(IndexBufferConsumer.class).to(EsIndexBufferConsumerImpl.class).asEagerSingleton();
 
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/489b2d78/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
index 455dba6..2e0fb56 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
@@ -76,18 +76,18 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
     private final IndexIdentifier.IndexAlias alias;
     private final IndexIdentifier indexIdentifier;
 
-    private final IndexBatchBuffer indexBatchBuffer;
+    private final IndexBufferProducer indexBatchBufferProducer;
 
     private final AliasedEntityIndex entityIndex;
     private IndexOperationMessage container;
 
 
-    public EsEntityIndexBatchImpl(final ApplicationScope applicationScope, final Client client,final IndexBatchBuffer indexBatchBuffer,
+    public EsEntityIndexBatchImpl(final ApplicationScope applicationScope, final Client client,final IndexBufferProducer indexBatchBufferProducer,
             final IndexFig config, final AliasedEntityIndex entityIndex ) {
 
         this.applicationScope = applicationScope;
         this.client = client;
-        this.indexBatchBuffer = indexBatchBuffer;
+        this.indexBatchBufferProducer = indexBatchBufferProducer;
         this.entityIndex = entityIndex;
         this.indexIdentifier = IndexingUtils.createIndexIdentifier(config, applicationScope);
         this.alias = indexIdentifier.getAlias();
@@ -204,7 +204,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
     public BetterFuture execute() {
         IndexOperationMessage tempContainer = container;
         container = new IndexOperationMessage();
-        return indexBatchBuffer.put(tempContainer);
+        return indexBatchBufferProducer.put(tempContainer);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/489b2d78/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index 693b168..ad638c6 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@ -87,7 +87,7 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
 
     private final IndexIdentifier.IndexAlias alias;
     private final IndexIdentifier indexIdentifier;
-    private final IndexBatchBuffer indexBatchBuffer;
+    private final IndexBufferProducer indexBatchBufferProducer;
 
     /**
      * We purposefully make this per instance. Some indexes may work, while others may fail
@@ -119,8 +119,8 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
 
 
     @Inject
-    public EsEntityIndexImpl( @Assisted final ApplicationScope appScope, final IndexFig config, final IndexBatchBuffer indexBatchBuffer, final EsProvider provider, final EsIndexCache indexCache) {
-        this.indexBatchBuffer = indexBatchBuffer;
+    public EsEntityIndexImpl( @Assisted final ApplicationScope appScope, final IndexFig config, final IndexBufferProducer indexBatchBufferProducer, final EsProvider provider, final EsIndexCache indexCache) {
+        this.indexBatchBufferProducer = indexBatchBufferProducer;
         ValidationUtils.validateApplicationScope( appScope );
         this.applicationScope = appScope;
         this.esProvider = provider;
@@ -282,7 +282,7 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
     @Override
     public EntityIndexBatch createBatch() {
         EntityIndexBatch batch = new EsEntityIndexBatchImpl(
-                applicationScope, esProvider.getClient(),indexBatchBuffer, config, this );
+                applicationScope, esProvider.getClient(),indexBatchBufferProducer, config, this );
         return batch;
     }
 
@@ -434,7 +434,7 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
 
     public void refresh() {
 
-        BetterFuture future = indexBatchBuffer.put(new IndexOperationMessage());
+        BetterFuture future = indexBatchBufferProducer.put(new IndexOperationMessage());
         future.get();
         //loop through all batches and retrieve promises and call get
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/489b2d78/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
new file mode 100644
index 0000000..eaca9bd
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
@@ -0,0 +1,193 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  *  contributor license agreements.  The ASF licenses this file to You
+ *  * under the Apache License, Version 2.0 (the "License"); you may not
+ *  * use this file except in compliance with the License.
+ *  * You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.  For additional information regarding
+ *  * copyright in this work, please see the NOTICE file in the top level
+ *  * directory of this distribution.
+ *
+ */
+package org.apache.usergrid.persistence.index.impl;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Timer;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.index.IndexBufferConsumer;
+import org.apache.usergrid.persistence.index.IndexBufferProducer;
+import org.apache.usergrid.persistence.index.IndexFig;
+import org.apache.usergrid.persistence.index.IndexOperationMessage;
+import org.elasticsearch.action.WriteConsistencyLevel;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.delete.DeleteRequestBuilder;
+import org.elasticsearch.action.index.IndexRequestBuilder;
+import org.elasticsearch.action.support.replication.ShardReplicationOperationRequestBuilder;
+import org.elasticsearch.client.Client;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import rx.Observable;
+import rx.functions.Action1;
+import rx.functions.Func1;
+import rx.schedulers.Schedulers;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Consumer for IndexOperationMessages
+ */
+@Singleton
+public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
+    private static final Logger log = LoggerFactory.getLogger(EsIndexBufferConsumerImpl.class);
+
+    private final IndexFig config;
+    private final FailureMonitorImpl failureMonitor;
+    private final Client client;
+    private final Observable<List<IndexOperationMessage>> consumer;
+    private final Timer flushTimer;
+    private final Counter indexSizeCounter;
+
+    @Inject
+    public EsIndexBufferConsumerImpl(final IndexFig config, final IndexBufferProducer producer, final EsProvider provider, final MetricsFactory metricsFactory){
+        this.flushTimer = metricsFactory.getTimer(EsIndexBufferConsumerImpl.class, "index.buffer.flush");
+        this.indexSizeCounter =  metricsFactory.getCounter(EsIndexBufferConsumerImpl.class, "index.buffer.size");
+        this.config = config;
+        this.failureMonitor = new FailureMonitorImpl(config,provider);
+        this.client = provider.getClient();
+
+        final AtomicLong queueSize = new AtomicLong();
+        //batch up sets of some size and send them in batch
+        this.consumer = Observable.create(producer)
+            .subscribeOn(Schedulers.io())
+            .doOnNext(new Action1<IndexOperationMessage>() {
+                @Override
+                public void call(IndexOperationMessage requestBuilderContainer) {
+                    queueSize.addAndGet(requestBuilderContainer.getBuilder().size());
+                }
+            })
+            .buffer(config.getIndexBufferTimeout(), TimeUnit.MILLISECONDS, config.getIndexBufferSize())
+            .doOnNext(new Action1<List<IndexOperationMessage>>() {
+                @Override
+                public void call(List<IndexOperationMessage> containerList) {
+                    flushTimer.time();
+                    if(containerList.size()>0){
+                        execute(containerList);
+                    }
+                }
+            });
+        consumer.subscribe();
+    }
+
+    /**
+     * Execute the request, check for errors, then re-init the batch for future use
+     */
+    private void execute(final List<IndexOperationMessage> operationMessages) {
+
+        if (operationMessages == null || operationMessages.size() == 0) {
+            return;
+        }
+
+        //process and flatten all the messages to builder requests
+        Observable<ShardReplicationOperationRequestBuilder> flattenMessages = Observable.from(operationMessages)
+            .subscribeOn(Schedulers.io())
+            .flatMap(new Func1<IndexOperationMessage, Observable<ShardReplicationOperationRequestBuilder>>() {
+                @Override
+                public Observable<ShardReplicationOperationRequestBuilder> call(IndexOperationMessage operationMessage) {
+                    return Observable.from(operationMessage.getBuilder())
+                        .map(new Func1<ShardReplicationOperationRequestBuilder, ShardReplicationOperationRequestBuilder>() {
+                            @Override
+                            public ShardReplicationOperationRequestBuilder call(ShardReplicationOperationRequestBuilder builder) {
+                                return builder;
+                            }
+                        });
+                }
+            });
+
+        //batch shard operations into a bulk request
+        flattenMessages
+            .buffer(config.getIndexBatchSize())
+            .doOnNext(new Action1<List<ShardReplicationOperationRequestBuilder>>() {
+                @Override
+                public void call(List<ShardReplicationOperationRequestBuilder> builders) {
+                    final BulkRequestBuilder bulkRequest = initRequest();
+                    for (ShardReplicationOperationRequestBuilder builder : builders) {
+                        indexSizeCounter.dec();
+                        if (builder instanceof IndexRequestBuilder) {
+                            bulkRequest.add((IndexRequestBuilder) builder);
+                        }
+                        if (builder instanceof DeleteRequestBuilder) {
+                            bulkRequest.add((DeleteRequestBuilder) builder);
+                        }
+                    }
+                    sendRequest(bulkRequest);
+                }
+            })
+            .toBlocking().lastOrDefault(null);
+
+        //call back all futures
+        Observable.from(operationMessages)
+            .subscribeOn(Schedulers.io())
+            .doOnNext(new Action1<IndexOperationMessage>() {
+                @Override
+                public void call(IndexOperationMessage operationMessage) {
+                    operationMessage.getFuture().done();
+                }
+            })
+            .toBlocking().lastOrDefault(null);
+    }
+
+    /**
+     * initialize request
+     * @return
+     */
+    private BulkRequestBuilder initRequest() {
+        BulkRequestBuilder bulkRequest = client.prepareBulk();
+        bulkRequest.setConsistencyLevel(WriteConsistencyLevel.fromString(config.getWriteConsistencyLevel()));
+        bulkRequest.setRefresh(config.isForcedRefresh());
+        return bulkRequest;
+    }
+
+    /**
+     * send bulk request
+     * @param bulkRequest
+     */
+    private void sendRequest(BulkRequestBuilder bulkRequest) {
+        //nothing to do, we haven't added anthing to the index
+        if (bulkRequest.numberOfActions() == 0) {
+            return;
+        }
+
+        final BulkResponse responses;
+
+        try {
+            responses = bulkRequest.execute().actionGet();
+        } catch (Throwable t) {
+            log.error("Unable to communicate with elasticsearch");
+            failureMonitor.fail("Unable to execute batch", t);
+            throw t;
+        }
+
+        failureMonitor.success();
+
+        for (BulkItemResponse response : responses) {
+            if (response.isFailed()) {
+                throw new RuntimeException("Unable to index documents.  Errors are :"
+                    + response.getFailure().getMessage());
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/489b2d78/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java
new file mode 100644
index 0000000..791cea8
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java
@@ -0,0 +1,55 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  *  contributor license agreements.  The ASF licenses this file to You
+ *  * under the Apache License, Version 2.0 (the "License"); you may not
+ *  * use this file except in compliance with the License.
+ *  * You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.  For additional information regarding
+ *  * copyright in this work, please see the NOTICE file in the top level
+ *  * directory of this distribution.
+ *
+ */
+package org.apache.usergrid.persistence.index.impl;
+
+import com.codahale.metrics.Counter;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import org.apache.usergrid.persistence.core.future.BetterFuture;
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.index.IndexBufferProducer;
+import org.apache.usergrid.persistence.index.IndexOperationMessage;
+import rx.Subscriber;
+
+/**
+ * Producer for index operation messages
+ */
+@Singleton
+public class EsIndexBufferProducerImpl implements IndexBufferProducer {
+
+    private final Counter indexSizeCounter;
+    private Subscriber<? super IndexOperationMessage> subscriber;
+
+    @Inject
+    public EsIndexBufferProducerImpl(MetricsFactory metricsFactory){
+        this.indexSizeCounter = metricsFactory.getCounter(EsIndexBufferProducerImpl.class,"index.buffer.size");
+
+    }
+    @Override
+    public void call(Subscriber<? super IndexOperationMessage> subscriber) {
+        this.subscriber = subscriber;
+    }
+
+    public BetterFuture put(IndexOperationMessage message){
+        indexSizeCounter.inc(message.getBuilder().size());
+        subscriber.onNext(message);
+        return message.getFuture();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/489b2d78/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
deleted file mode 100644
index 92ab582..0000000
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.usergrid.persistence.index.impl;
-
-import com.codahale.metrics.Counter;
-import com.codahale.metrics.Timer;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-import org.apache.usergrid.persistence.core.future.BetterFuture;
-import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
-import org.apache.usergrid.persistence.index.IndexBatchBuffer;
-import org.apache.usergrid.persistence.index.IndexFig;
-import org.apache.usergrid.persistence.index.IndexOperationMessage;
-import org.elasticsearch.action.WriteConsistencyLevel;
-import org.elasticsearch.action.bulk.BulkItemResponse;
-import org.elasticsearch.action.bulk.BulkRequestBuilder;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.delete.DeleteRequestBuilder;
-import org.elasticsearch.action.index.IndexRequestBuilder;
-import org.elasticsearch.action.support.replication.ShardReplicationOperationRequestBuilder;
-import org.elasticsearch.client.Client;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import rx.Observable;
-import rx.Subscriber;
-import rx.functions.Action1;
-import rx.functions.Func1;
-import rx.schedulers.Schedulers;
-
-import java.util.List;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-
-
-/**
- * Buffer index requests into sets to send.
- */
-@Singleton
-public class IndexBatchBufferImpl implements IndexBatchBuffer {
-
-    private static final Logger log = LoggerFactory.getLogger(IndexBatchBufferImpl.class);
-    private final Counter indexSizeCounter;
-    private final Client client;
-    private final FailureMonitorImpl failureMonitor;
-    private final IndexFig config;
-    private final Timer flushTimer;
-    private final Counter bufferCounter;
-    private Observable<List<IndexOperationMessage>> consumer;
-    private Producer producer;
-
-    @Inject
-    public IndexBatchBufferImpl(final IndexFig config, final EsProvider provider, final MetricsFactory metricsFactory){
-        this.flushTimer = metricsFactory.getTimer(IndexBatchBuffer.class, "index.buffer.flush");
-        this.indexSizeCounter =  metricsFactory.getCounter(IndexBatchBuffer.class, "index.buffer.size");
-        this.config = config;
-        this.failureMonitor = new FailureMonitorImpl(config,provider);
-        this.producer = new Producer();
-        this.client = provider.getClient();
-        bufferCounter = metricsFactory.getCounter(IndexBatchBuffer.class,"index.buffer.size");
-        consumer();
-    }
-
-    private void consumer() {
-        final AtomicLong queueSize = new AtomicLong();
-        //batch up sets of some size and send them in batch
-        this.consumer = Observable.create(producer)
-                .subscribeOn(Schedulers.io())
-                .doOnNext(new Action1<IndexOperationMessage>() {
-                    @Override
-                    public void call(IndexOperationMessage requestBuilderContainer) {
-                        queueSize.addAndGet(requestBuilderContainer.getBuilder().size());
-                    }
-                })
-                .buffer(config.getIndexBufferTimeout(), TimeUnit.MILLISECONDS, config.getIndexBufferSize())
-                .doOnNext(new Action1<List<IndexOperationMessage>>() {
-                    @Override
-                    public void call(List<IndexOperationMessage> containerList) {
-                        flushTimer.time();
-                        indexSizeCounter.dec(containerList.size());
-                        if(containerList.size()>0){
-                            execute(containerList);
-                        }
-                    }
-                });
-        consumer.subscribe();
-    }
-
-    @Override
-    public BetterFuture put(IndexOperationMessage container){
-        bufferCounter.inc();
-        producer.put(container);
-        return container.getFuture();
-    }
-
-
-    /**
-     * Execute the request, check for errors, then re-init the batch for future use
-     */
-    private void execute(final List<IndexOperationMessage> containers) {
-
-        if (containers == null || containers.size() == 0) {
-            return;
-        }
-
-        final AtomicBoolean isForceRefresh = new AtomicBoolean(config.isForcedRefresh());
-        //clear the queue or proceed to buffer size
-        Observable.from(containers)
-                .subscribeOn(Schedulers.io())
-                .flatMap(new Func1<IndexOperationMessage, Observable<ShardReplicationOperationRequestBuilder>>() {
-                    @Override
-                    public Observable<ShardReplicationOperationRequestBuilder> call(IndexOperationMessage requestBuilderContainer) {
-                        return Observable.from(requestBuilderContainer.getBuilder())
-                                .map(new Func1<ShardReplicationOperationRequestBuilder, ShardReplicationOperationRequestBuilder>() {
-                                    @Override
-                                    public ShardReplicationOperationRequestBuilder call(ShardReplicationOperationRequestBuilder builder) {
-                                        return builder;
-                                    }
-                                });
-                    }
-                })
-                .buffer(config.getIndexBatchSize())
-                .doOnNext(new Action1<List<ShardReplicationOperationRequestBuilder>>() {
-                    @Override
-                    public void call(List<ShardReplicationOperationRequestBuilder> builders) {
-                        final BulkRequestBuilder bulkRequest = initRequest(isForceRefresh.get());
-                        for (ShardReplicationOperationRequestBuilder builder : builders) {
-                            if (builder instanceof IndexRequestBuilder) {
-                                bulkRequest.add((IndexRequestBuilder) builder);
-                            }
-                            if (builder instanceof DeleteRequestBuilder) {
-                                bulkRequest.add((DeleteRequestBuilder) builder);
-                            }
-                        }
-                        sendRequest(bulkRequest);
-                    }
-                }).toBlocking().lastOrDefault(null);
-
-        for (IndexOperationMessage container : containers) {
-            container.done();
-        }
-    }
-
-    private BulkRequestBuilder initRequest(boolean refresh) {
-        BulkRequestBuilder bulkRequest = client.prepareBulk();
-        bulkRequest.setConsistencyLevel(WriteConsistencyLevel.fromString(config.getWriteConsistencyLevel()));
-        bulkRequest.setRefresh(refresh);
-        return bulkRequest;
-    }
-
-    private void sendRequest(BulkRequestBuilder bulkRequest) {
-        //nothing to do, we haven't added anthing to the index
-        if (bulkRequest.numberOfActions() == 0) {
-            return;
-        }
-
-        final BulkResponse responses;
-
-        try {
-            responses = bulkRequest.execute().actionGet();
-        } catch (Throwable t) {
-            log.error("Unable to communicate with elasticsearch");
-            failureMonitor.fail("Unable to execute batch", t);
-            throw t;
-        }
-
-        failureMonitor.success();
-
-        for (BulkItemResponse response : responses) {
-            if (response.isFailed()) {
-                throw new RuntimeException("Unable to index documents.  Errors are :"
-                        + response.getFailure().getMessage());
-            }
-        }
-    }
-
-
-    private static class Producer implements Observable.OnSubscribe<IndexOperationMessage> {
-
-        private Subscriber<? super IndexOperationMessage> subscriber;
-
-        @Override
-        public void call(Subscriber<? super IndexOperationMessage> subscriber) {
-            this.subscriber = subscriber;
-        }
-
-        public void put(IndexOperationMessage r){
-            subscriber.onNext(r);
-        }
-    }
-
-}