You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/02/27 00:38:33 UTC

[17/24] incubator-usergrid git commit: undo change

undo change

remove synchronous calls

new promises structure, remove executeandrefresh

add refresh


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/090902a1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/090902a1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/090902a1

Branch: refs/heads/two-dot-o
Commit: 090902a10638b2a6b0c449a06b1316c055039a2a
Parents: 649a50e
Author: Shawn Feldman <sf...@apache.org>
Authored: Wed Feb 25 11:50:54 2015 -0700
Committer: Shawn Feldman <sf...@apache.org>
Committed: Wed Feb 25 18:38:01 2015 -0700

----------------------------------------------------------------------
 .../corepersistence/CpEntityManager.java        |  3 +-
 .../corepersistence/CpRelationManager.java      |  7 +--
 .../batch/job/AbstractSchedulerRuntimeIT.java   |  2 +-
 .../usergrid/persistence/CollectionIT.java      |  2 +-
 .../persistence/index/EntityIndexBatch.java     |  5 --
 .../persistence/index/IndexBatchBuffer.java     | 10 +--
 .../usergrid/persistence/index/IndexFig.java    |  4 +-
 .../index/IndexOperationMessage.java            | 57 +++++++++++++++++
 .../index/RequestBuilderContainer.java          | 65 --------------------
 .../index/impl/EsEntityIndexBatchImpl.java      | 28 ++-------
 .../index/impl/EsEntityIndexImpl.java           | 10 ++-
 .../index/impl/IndexBatchBufferImpl.java        | 37 +++++------
 .../impl/EntityConnectionIndexImplTest.java     |  7 ++-
 .../persistence/index/impl/EntityIndexTest.java | 26 +++++---
 14 files changed, 112 insertions(+), 151 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/090902a1/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index 7905c43..8f432a2 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -980,7 +980,6 @@ public class CpEntityManager implements EntityManager {
         // update in all containing collections and connection indexes
         CpRelationManager rm = ( CpRelationManager ) getRelationManager( entityRef );
         rm.updateContainingCollectionAndCollectionIndexes( cpEntity );
-        future.get();
     }
 
 
@@ -2830,7 +2829,7 @@ public class CpEntityManager implements EntityManager {
         //
         //        batch.index(appAllTypesScope, memberEntity);
 
-        batch.execute().get();
+        batch.execute();
     }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/090902a1/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index 07fc45e..302f18d 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -442,7 +442,7 @@ public class CpRelationManager implements RelationManager {
                 } ).count().toBlocking().lastOrDefault( 0 );
 
 
-        entityIndexBatch.execute().get();
+        entityIndexBatch.execute();
 
         logger.debug( "updateContainingCollectionsAndCollections() updated {} indexes", count );
     }
@@ -871,7 +871,6 @@ public class CpRelationManager implements RelationManager {
                 }
             }
         }
-        future.get();
     }
 
     @Override
@@ -1068,8 +1067,6 @@ public class CpRelationManager implements RelationManager {
         batchUpdateEntityConnection( m, false, connection, UUIDGenerator.newTimeUUID() );
         batchExecute( m, CassandraService.RETRY_COUNT );
 
-        future.get();
-
         return connection;
     }
 
@@ -1294,7 +1291,7 @@ public class CpRelationManager implements RelationManager {
 //
 //        batch.deindex( allTypesIndexScope, targetEntity );
 
-        batch.execute().get();
+        batch.execute();
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/090902a1/stack/core/src/test/java/org/apache/usergrid/batch/job/AbstractSchedulerRuntimeIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/batch/job/AbstractSchedulerRuntimeIT.java b/stack/core/src/test/java/org/apache/usergrid/batch/job/AbstractSchedulerRuntimeIT.java
index 81836ae..1a15d92 100644
--- a/stack/core/src/test/java/org/apache/usergrid/batch/job/AbstractSchedulerRuntimeIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/batch/job/AbstractSchedulerRuntimeIT.java
@@ -96,7 +96,7 @@ public class AbstractSchedulerRuntimeIT {
         JobSchedulerService jobScheduler = springResource.getBean( JobSchedulerService.class );
         jobScheduler.setJobListener( listener );
         if ( jobScheduler.state() != State.RUNNING ) {
-//            jobScheduler.startAndWait();
+           jobScheduler.startAndWait();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/090902a1/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java
index 1881fff..641d3ad 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java
@@ -911,7 +911,7 @@ public class CollectionIT extends AbstractCoreIT {
         properties.put( "keywords", "Action, New" );
         Entity thirdGame = em.create( "game", properties );
 
-        em.refreshIndex();
+        em.refreshIndex();//need to track all batches then resolve promises
 
         Query query = Query.fromQL( "select * where keywords contains 'new' and title contains 'extreme'" );
         Results r = em.searchCollection( em.getApplicationRef(), "games", query );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/090902a1/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
index bf606bc..580a7f4 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
@@ -68,9 +68,4 @@ public interface EntityIndexBatch {
      */
     public BetterFuture execute();
 
-    /**
-     * Execute the batch and force the refresh
-     * @return future to guarantee execution
-     */
-    public BetterFuture executeAndRefresh();
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/090902a1/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBatchBuffer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBatchBuffer.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBatchBuffer.java
index 1d71bcd..d1015a0 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBatchBuffer.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBatchBuffer.java
@@ -17,14 +17,6 @@
 package org.apache.usergrid.persistence.index;
 
 import org.apache.usergrid.persistence.core.future.BetterFuture;
-import org.apache.usergrid.persistence.index.impl.IndexBatchBufferImpl;
-import org.elasticsearch.action.delete.DeleteRequestBuilder;
-import org.elasticsearch.action.index.IndexRequestBuilder;
-import rx.Observable;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.Future;
-import java.util.concurrent.FutureTask;
 
 /**
  * * Buffer index requests into sets to send,
@@ -36,7 +28,7 @@ public interface IndexBatchBuffer {
      *
      * @param container
      */
-    public BetterFuture put(RequestBuilderContainer container);
+    public BetterFuture put(IndexOperationMessage container);
 
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/090902a1/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
index 7434b99..8040a62 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
@@ -133,7 +133,7 @@ public interface IndexFig extends GuicyFig {
      * size of the buffer to build up before you send results
      * @return
      */
-    @Default("300")
+    @Default("1000")
     @Key( INDEX_BUFFER_SIZE )
     int getIndexBufferSize();
 
@@ -141,7 +141,7 @@ public interface IndexFig extends GuicyFig {
      * Request batch size for ES
      * @return
      */
-    @Default("100")
+    @Default("1000")
     @Key( INDEX_BATCH_SIZE)
     int getIndexBatchSize();
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/090902a1/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java
new file mode 100644
index 0000000..5acb17e
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.persistence.index;
+
+import org.apache.usergrid.persistence.core.future.BetterFuture;
+import org.elasticsearch.action.support.replication.ShardReplicationOperationRequestBuilder;
+
+import java.util.Iterator;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * Classy class class.
+ */
+public  class IndexOperationMessage {
+    private final ConcurrentLinkedQueue<ShardReplicationOperationRequestBuilder> builders;
+    private final BetterFuture<IndexOperationMessage> containerFuture;
+
+    public IndexOperationMessage(){
+        final IndexOperationMessage parent = this;
+        builders = new ConcurrentLinkedQueue<>();
+        this.containerFuture = new BetterFuture<>(new Callable<IndexOperationMessage>() {
+            @Override
+            public IndexOperationMessage call() throws Exception {
+                return parent;
+            }
+        });
+    }
+
+    public void add(ShardReplicationOperationRequestBuilder builder){
+        builders.add(builder);
+    }
+    public ConcurrentLinkedQueue<ShardReplicationOperationRequestBuilder> getBuilder(){
+        return builders;
+    }
+    public void done(){
+        containerFuture.done();
+    }
+    public BetterFuture<IndexOperationMessage> getFuture(){
+        return containerFuture;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/090902a1/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/RequestBuilderContainer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/RequestBuilderContainer.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/RequestBuilderContainer.java
deleted file mode 100644
index e7928ff..0000000
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/RequestBuilderContainer.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.usergrid.persistence.index;
-
-import org.apache.usergrid.persistence.core.future.BetterFuture;
-import org.elasticsearch.action.support.replication.ShardReplicationOperationRequestBuilder;
-
-import java.util.Iterator;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-/**
- * Classy class class.
- */
-public  class RequestBuilderContainer{
-    private final ConcurrentLinkedQueue<ShardReplicationOperationRequestBuilder> builders;
-    private final BetterFuture<Iterator<ShardReplicationOperationRequestBuilder>> containerFuture;
-
-    private  boolean forceRefresh = false;
-
-    public RequestBuilderContainer(){
-        final RequestBuilderContainer parent = this;
-        builders = new ConcurrentLinkedQueue<>();
-        this.containerFuture = new BetterFuture<>(new Callable<Iterator<ShardReplicationOperationRequestBuilder>>() {
-            @Override
-            public Iterator<ShardReplicationOperationRequestBuilder> call() throws Exception {
-                return parent.getBuilder().iterator();
-            }
-        });
-    }
-
-    public void add(ShardReplicationOperationRequestBuilder builder){
-        builders.add(builder);
-    }
-    public ConcurrentLinkedQueue<ShardReplicationOperationRequestBuilder> getBuilder(){
-        return builders;
-    }
-    public void done(){
-        containerFuture.done();
-    }
-    public BetterFuture<Iterator<ShardReplicationOperationRequestBuilder>> getFuture(){
-        return containerFuture;
-    }
-
-    public boolean isForceRefresh() {
-        return forceRefresh;
-    }
-    public void setForceRefresh(boolean forceRefresh) {
-        this.forceRefresh = forceRefresh;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/090902a1/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
index c0ddfdc..455dba6 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
@@ -19,16 +19,9 @@
 package org.apache.usergrid.persistence.index.impl;
 
 import java.util.*;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicInteger;
 
-import com.google.common.util.concurrent.Futures;
 import org.apache.usergrid.persistence.core.future.BetterFuture;
-import org.apache.usergrid.persistence.core.rx.ObservableIterator;
 import org.apache.usergrid.persistence.index.*;
-import org.elasticsearch.action.bulk.BulkItemResponse;
-import org.elasticsearch.action.bulk.BulkRequestBuilder;
-import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.action.delete.DeleteRequestBuilder;
 import org.elasticsearch.action.index.IndexRequestBuilder;
 import org.elasticsearch.client.Client;
@@ -56,11 +49,8 @@ import org.apache.usergrid.persistence.model.field.StringField;
 import org.apache.usergrid.persistence.model.field.UUIDField;
 import org.apache.usergrid.persistence.model.field.value.EntityObject;
 
-import com.google.common.base.Joiner;
 import rx.Observable;
-import rx.functions.Action1;
 import rx.functions.Func1;
-import rx.schedulers.Schedulers;
 
 import static org.apache.usergrid.persistence.index.impl.IndexingUtils.ANALYZED_STRING_PREFIX;
 import static org.apache.usergrid.persistence.index.impl.IndexingUtils.BOOLEAN_PREFIX;
@@ -71,7 +61,6 @@ import static org.apache.usergrid.persistence.index.impl.IndexingUtils.NUMBER_PR
 import static org.apache.usergrid.persistence.index.impl.IndexingUtils.STRING_PREFIX;
 import static org.apache.usergrid.persistence.index.impl.IndexingUtils.createContextName;
 import static org.apache.usergrid.persistence.index.impl.IndexingUtils.createIndexDocId;
-import static org.apache.usergrid.persistence.index.impl.IndexingUtils.createContextName;
 
 
 public class EsEntityIndexBatchImpl implements EntityIndexBatch {
@@ -90,7 +79,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
     private final IndexBatchBuffer indexBatchBuffer;
 
     private final AliasedEntityIndex entityIndex;
-    private RequestBuilderContainer container;
+    private IndexOperationMessage container;
 
 
     public EsEntityIndexBatchImpl(final ApplicationScope applicationScope, final Client client,final IndexBatchBuffer indexBatchBuffer,
@@ -104,7 +93,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
         this.alias = indexIdentifier.getAlias();
         this.refresh = config.isForcedRefresh();
         //constrained
-        this.container = new RequestBuilderContainer();
+        this.container = new IndexOperationMessage();
     }
 
 
@@ -213,16 +202,8 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
 
     @Override
     public BetterFuture execute() {
-        RequestBuilderContainer tempContainer = container;
-        container = new RequestBuilderContainer();
-        return indexBatchBuffer.put(tempContainer);
-    }
-
-    @Override
-    public BetterFuture executeAndRefresh() {
-        container.setForceRefresh(true);
-        RequestBuilderContainer tempContainer = container;
-        container = new RequestBuilderContainer();
+        IndexOperationMessage tempContainer = container;
+        container = new IndexOperationMessage();
         return indexBatchBuffer.put(tempContainer);
     }
 
@@ -364,5 +345,4 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
         return processed;
     }
 
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/090902a1/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index 2881ce4..693b168 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@ -23,6 +23,7 @@ import com.google.inject.Inject;
 import com.google.inject.assistedinject.Assisted;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang3.ArrayUtils;
+import org.apache.usergrid.persistence.core.future.BetterFuture;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.util.Health;
 import org.apache.usergrid.persistence.core.util.ValidationUtils;
@@ -280,8 +281,9 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
 
     @Override
     public EntityIndexBatch createBatch() {
-        return new EsEntityIndexBatchImpl(
+        EntityIndexBatch batch = new EsEntityIndexBatchImpl(
                 applicationScope, esProvider.getClient(),indexBatchBuffer, config, this );
+        return batch;
     }
 
 
@@ -432,6 +434,10 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
 
     public void refresh() {
 
+        BetterFuture future = indexBatchBuffer.put(new IndexOperationMessage());
+        future.get();
+        //loop through all batches and retrieve promises and call get
+
         final RetryOperation retryOperation = new RetryOperation() {
             @Override
             public boolean doOp() {
@@ -579,7 +585,7 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
         if ( response.isAcknowledged() ) {
             logger.info( "Deleted index: read {} write {}", alias.getReadAlias(), alias.getWriteAlias());
             //invlaidate the alias
-            aliasCache.invalidate( alias );
+            aliasCache.invalidate(alias);
         }
         else {
             logger.info( "Failed to delete index: read {} write {}", alias.getReadAlias(), alias.getWriteAlias());

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/090902a1/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
index 73af70f..92ab582 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
@@ -24,7 +24,7 @@ import org.apache.usergrid.persistence.core.future.BetterFuture;
 import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.index.IndexBatchBuffer;
 import org.apache.usergrid.persistence.index.IndexFig;
-import org.apache.usergrid.persistence.index.RequestBuilderContainer;
+import org.apache.usergrid.persistence.index.IndexOperationMessage;
 import org.elasticsearch.action.WriteConsistencyLevel;
 import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkRequestBuilder;
@@ -37,14 +37,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import rx.Observable;
 import rx.Subscriber;
-import rx.functions.Action0;
 import rx.functions.Action1;
 import rx.functions.Func1;
 import rx.schedulers.Schedulers;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -64,7 +60,7 @@ public class IndexBatchBufferImpl implements IndexBatchBuffer {
     private final IndexFig config;
     private final Timer flushTimer;
     private final Counter bufferCounter;
-    private Observable<List<RequestBuilderContainer>> consumer;
+    private Observable<List<IndexOperationMessage>> consumer;
     private Producer producer;
 
     @Inject
@@ -84,16 +80,16 @@ public class IndexBatchBufferImpl implements IndexBatchBuffer {
         //batch up sets of some size and send them in batch
         this.consumer = Observable.create(producer)
                 .subscribeOn(Schedulers.io())
-                .doOnNext(new Action1<RequestBuilderContainer>() {
+                .doOnNext(new Action1<IndexOperationMessage>() {
                     @Override
-                    public void call(RequestBuilderContainer requestBuilderContainer) {
+                    public void call(IndexOperationMessage requestBuilderContainer) {
                         queueSize.addAndGet(requestBuilderContainer.getBuilder().size());
                     }
                 })
                 .buffer(config.getIndexBufferTimeout(), TimeUnit.MILLISECONDS, config.getIndexBufferSize())
-                .doOnNext(new Action1<List<RequestBuilderContainer>>() {
+                .doOnNext(new Action1<List<IndexOperationMessage>>() {
                     @Override
-                    public void call(List<RequestBuilderContainer> containerList) {
+                    public void call(List<IndexOperationMessage> containerList) {
                         flushTimer.time();
                         indexSizeCounter.dec(containerList.size());
                         if(containerList.size()>0){
@@ -105,7 +101,7 @@ public class IndexBatchBufferImpl implements IndexBatchBuffer {
     }
 
     @Override
-    public BetterFuture put(RequestBuilderContainer container){
+    public BetterFuture put(IndexOperationMessage container){
         bufferCounter.inc();
         producer.put(container);
         return container.getFuture();
@@ -115,7 +111,7 @@ public class IndexBatchBufferImpl implements IndexBatchBuffer {
     /**
      * Execute the request, check for errors, then re-init the batch for future use
      */
-    private void execute(final List<RequestBuilderContainer> containers) {
+    private void execute(final List<IndexOperationMessage> containers) {
 
         if (containers == null || containers.size() == 0) {
             return;
@@ -125,12 +121,9 @@ public class IndexBatchBufferImpl implements IndexBatchBuffer {
         //clear the queue or proceed to buffer size
         Observable.from(containers)
                 .subscribeOn(Schedulers.io())
-                .flatMap(new Func1<RequestBuilderContainer, Observable<ShardReplicationOperationRequestBuilder>>() {
+                .flatMap(new Func1<IndexOperationMessage, Observable<ShardReplicationOperationRequestBuilder>>() {
                     @Override
-                    public Observable<ShardReplicationOperationRequestBuilder> call(RequestBuilderContainer requestBuilderContainer) {
-                        if (requestBuilderContainer.isForceRefresh()){
-                            isForceRefresh.set(true);
-                        }
+                    public Observable<ShardReplicationOperationRequestBuilder> call(IndexOperationMessage requestBuilderContainer) {
                         return Observable.from(requestBuilderContainer.getBuilder())
                                 .map(new Func1<ShardReplicationOperationRequestBuilder, ShardReplicationOperationRequestBuilder>() {
                                     @Override
@@ -157,7 +150,7 @@ public class IndexBatchBufferImpl implements IndexBatchBuffer {
                     }
                 }).toBlocking().lastOrDefault(null);
 
-        for (RequestBuilderContainer container : containers) {
+        for (IndexOperationMessage container : containers) {
             container.done();
         }
     }
@@ -196,16 +189,16 @@ public class IndexBatchBufferImpl implements IndexBatchBuffer {
     }
 
 
-    private static class Producer implements Observable.OnSubscribe<RequestBuilderContainer> {
+    private static class Producer implements Observable.OnSubscribe<IndexOperationMessage> {
 
-        private Subscriber<? super RequestBuilderContainer> subscriber;
+        private Subscriber<? super IndexOperationMessage> subscriber;
 
         @Override
-        public void call(Subscriber<? super RequestBuilderContainer> subscriber) {
+        public void call(Subscriber<? super IndexOperationMessage> subscriber) {
             this.subscriber = subscriber;
         }
 
-        public void put(RequestBuilderContainer r){
+        public void put(IndexOperationMessage r){
             subscriber.onNext(r);
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/090902a1/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java
index c65f106..37b5e90 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java
@@ -136,7 +136,7 @@ public class EntityConnectionIndexImplTest extends BaseIT {
         batch.index( searchScope, oj );
         batch.index( otherIndexScope, oj );
 
-        batch.executeAndRefresh().get();
+        batch.execute().get();
         personLikesIndex.refresh();
 
 
@@ -267,7 +267,7 @@ public class EntityConnectionIndexImplTest extends BaseIT {
         batch.index( searchScope, oj );
         batch.index( otherIndexScope, oj );
 
-        batch.executeAndRefresh().get();
+        batch.execute().get();
         personLikesIndex.refresh();
 
         EsTestUtils.waitForTasks( personLikesIndex );
@@ -287,7 +287,8 @@ public class EntityConnectionIndexImplTest extends BaseIT {
         batch.deindex( searchScope, egg );
         batch.deindex( searchScope, muffin );
         batch.deindex( searchScope, oj );
-        batch.executeAndRefresh().get();
+        batch.execute().get();
+        personLikesIndex.refresh();
 
         likes = personLikesIndex.search( searchScope,
                 SearchTypes.fromTypes( muffin.getId().getType(), egg.getId().getType(), oj.getId().getType() ),

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/090902a1/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
index 918ad35..83ba1ec 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
@@ -95,8 +95,8 @@ public class EntityIndexTest extends BaseIT {
         final ApplicationScope applicationScope = new ApplicationScopeImpl( appId );
 
         long now = System.currentTimeMillis();
-        final int threads = 1000;
-        final int size = 50;
+        final int threads = 20;
+        final int size = 20;
         final EntityIndex entityIndex = eif.createEntityIndex( applicationScope );
         final IndexScope indexScope = new IndexScopeImpl(appId, "things");
         final String entityType = "thing";
@@ -209,7 +209,7 @@ public class EntityIndexTest extends BaseIT {
         EntityIndexBatch entityIndexBatch = entityIndex.createBatch();
         entityIndexBatch.deindex(indexScope, crs.get(0));
         entityIndexBatch.deindex(indexScope, crs.get(1));
-        entityIndexBatch.executeAndRefresh().get();
+        entityIndexBatch.execute().get();
         entityIndex.refresh();
 
         //Hilda Youn
@@ -222,7 +222,8 @@ public class EntityIndexTest extends BaseIT {
         List<Object> sampleJson = mapper.readValue( is, new TypeReference<List<Object>>() {} );
         EntityIndexBatch batch = entityIndex.createBatch();
         insertJsonBlob(sampleJson,batch, entityType, indexScope, max,startIndex);
-        batch.executeAndRefresh().get();
+        batch.execute().get();
+        entityIndex.refresh();
     }
 
     private void insertJsonBlob(List<Object> sampleJson, EntityIndexBatch batch, String entityType, IndexScope indexScope,final int max,final int startIndex) throws IOException {
@@ -285,7 +286,8 @@ public class EntityIndexTest extends BaseIT {
         EntityUtils.setVersion( entity, UUIDGenerator.newTimeUUID() );
         entity.setField(new UUIDField(IndexingUtils.ENTITYID_ID_FIELDNAME, UUID.randomUUID()));
 
-        entityIndex.createBatch().index(indexScope , entity ).executeAndRefresh().get();
+        entityIndex.createBatch().index(indexScope , entity ).execute().get();
+        entityIndex.refresh();
 
         CandidateResults candidateResults = entityIndex.search( indexScope, SearchTypes.fromTypes(entity.getId().getType()),
                 Query.fromQL( "name contains 'Ferrari*'" ) );
@@ -293,7 +295,7 @@ public class EntityIndexTest extends BaseIT {
 
         EntityIndexBatch batch = entityIndex.createBatch();
         batch.deindex(indexScope, entity).execute().get();
-        batch.executeAndRefresh().get();
+        batch.execute().get();
         entityIndex.refresh();
 
         candidateResults = entityIndex.search( indexScope, SearchTypes.fromTypes(entity.getId().getType()), Query.fromQL( "name contains 'Ferrari*'" ) );
@@ -437,7 +439,8 @@ public class EntityIndexTest extends BaseIT {
         batch.index( indexScope,  user );
         user.setField( new StringField( "address3", "apt 508" ) );
         batch.index( indexScope,  user);
-        batch.executeAndRefresh().get();
+        batch.execute().get();
+        entityIndex.refresh();
 
         CandidateResults results = entityIndex.getEntityVersions(indexScope,  user.getId() );
 
@@ -476,7 +479,8 @@ public class EntityIndexTest extends BaseIT {
         EntityIndexBatch batch = ei.createBatch();
 
         batch.index( appScope, user);
-        batch.executeAndRefresh().get();
+        batch.execute().get();
+        ei.refresh();
 
         Query query = new Query();
         query.addEqualityFilter( "username", "edanuff" );
@@ -484,7 +488,8 @@ public class EntityIndexTest extends BaseIT {
         assertEquals( user.getId(), r.get( 0 ).getId() );
 
         batch.deindex(appScope, user.getId(), user.getVersion() );
-        batch.executeAndRefresh().get();
+        batch.execute().get();
+        ei.refresh();
 
         // EntityRef
         query = new Query();
@@ -544,7 +549,8 @@ public class EntityIndexTest extends BaseIT {
         EntityUtils.setVersion( fred, UUIDGenerator.newTimeUUID() );
         batch.index( appScope, fred);
 
-        batch.executeAndRefresh().get();
+        batch.execute().get();
+        ei.refresh();
 
         final SearchTypes searchTypes = SearchTypes.fromTypes( "user" );