You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by mr...@apache.org on 2017/04/02 23:23:20 UTC

[3/3] usergrid git commit: Switch DISTRIBUTED database queueing to default not cache in memory as the in memory impl causes duplicate messgae processing quite often at the moment.

Switch DISTRIBUTED database queueing to default not cache in memory as the in memory impl causes duplicate messgae processing quite often at the moment.

 - includes making all the tests work without in-memory queue fronting the database queue which really means adding some more delay in tests
 - the tests now are actually faster now because the original refreshIndex() created and queried random entities which took longer in most cases
 - uncommented the checkReceipts function in Notification tests as these are now passing, with an added fix for duplicate receipt creation
 - some logging updates in the distributed database queueing impl (Qakka)
 - increased the default take to 500 from the queue when DISTRIBUTED database queueing is configured ( which is the default now )
 - Notifications Queue Listener thread names have a random identifier in included
 - reduced the DISTRIBUTED database queueing default long poll to 1 second from 5 seconds


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/d3e988bc
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/d3e988bc
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/d3e988bc

Branch: refs/heads/master
Commit: d3e988bcbb7eb417c84cfda7396ec3506521aa37
Parents: 8b63aae
Author: Michael Russo <ru...@google.com>
Authored: Sun Apr 2 16:14:05 2017 -0700
Committer: Michael Russo <ru...@google.com>
Committed: Sun Apr 2 16:14:05 2017 -0700

----------------------------------------------------------------------
 .../corepersistence/CpEntityManager.java        | 44 +---------
 .../asyncevents/AsyncIndexProvider.java         |  5 ++
 .../java/org/apache/usergrid/Application.java   |  4 +-
 .../org/apache/usergrid/CoreApplication.java    | 22 ++---
 .../org/apache/usergrid/CoreITSetupImpl.java    | 20 ++++-
 .../org/apache/usergrid/TestEntityIndex.java    |  1 +
 .../corepersistence/AggregationServiceTest.java | 15 ++--
 .../corepersistence/StaleIndexCleanupTest.java  | 17 ++--
 .../persistence/ApplicationServiceIT.java       |  6 +-
 .../usergrid/persistence/CollectionIT.java      | 74 ++++++++--------
 .../usergrid/persistence/CountingMutatorIT.java |  4 +-
 .../persistence/EntityConnectionsIT.java        | 16 ++--
 .../usergrid/persistence/EntityManagerIT.java   | 20 ++---
 .../org/apache/usergrid/persistence/GeoIT.java  | 32 +++----
 .../persistence/GeoQueryBooleanTest.java        |  4 +-
 .../apache/usergrid/persistence/IndexIT.java    | 18 ++--
 .../usergrid/persistence/PathQueryIT.java       |  7 +-
 .../usergrid/persistence/PermissionsIT.java     |  6 +-
 .../usergrid/persistence/RebuildIndexTest.java  | 26 +++---
 .../cassandra/EntityManagerFactoryImplIT.java   | 13 +--
 .../persistence/query/ConnectionHelper.java     |  2 +-
 .../query/IntersectionTransitivePagingIT.java   |  2 +-
 .../query/IntersectionUnionPagingIT.java        |  2 +-
 .../persistence/query/IteratingQueryIT.java     | 32 +++----
 .../persistence/query/NotSubPropertyIT.java     |  2 +-
 .../persistence/query/ParenthesisProblemIT.java |  2 +-
 .../resources/usergrid-custom-test.properties   |  3 +
 .../usergrid/persistence/qakka/QakkaFig.java    |  4 +-
 .../qakka/core/impl/InMemoryQueue.java          |  4 +-
 .../core/impl/QueueMessageManagerImpl.java      |  9 +-
 .../distributed/actors/QueueActorHelper.java    | 15 +++-
 .../distributed/actors/QueueActorRouter.java    |  2 +-
 .../distributed/actors/ShardAllocator.java      |  4 +-
 .../impl/DistributedQueueServiceImpl.java       | 26 ++++--
 .../distributed/actors/ShardAllocatorTest.java  |  3 +
 .../queue/src/test/resources/qakka.properties   |  5 +-
 .../usergrid/rest/CollectionMetadataIT.java     |  4 +-
 .../apache/usergrid/rest/NotificationsIT.java   | 14 ++-
 .../apache/usergrid/rest/PartialUpdateTest.java |  6 +-
 .../apache/usergrid/rest/SystemResourceIT.java  |  3 +-
 .../rest/applications/ApplicationCreateIT.java  |  2 +-
 .../rest/applications/ApplicationDeleteIT.java  |  6 +-
 .../applications/ApplicationResourceIT.java     | 12 +--
 .../applications/assets/AssetResourceIT.java    | 26 +++---
 .../applications/assets/AwsAssetResourceIT.java | 22 ++---
 .../collection/BrowserCompatibilityTest.java    |  2 +-
 .../collection/CollectionsResourceIT.java       | 70 +++++++--------
 .../collection/DuplicateNameIT.java             |  2 +-
 .../activities/ActivityResourceIT.java          |  8 +-
 .../collection/activities/PutTest.java          |  6 +-
 .../collection/devices/DevicesResourceIT.java   | 12 +--
 .../collection/groups/GroupResourceIT.java      | 39 +++++----
 .../collection/paging/PagingResourceIT.java     | 10 +--
 .../users/ConnectionResourceTest.java           | 22 ++---
 .../collection/users/OwnershipResourceIT.java   | 24 +++---
 .../collection/users/PermissionsResourceIT.java | 58 ++++++-------
 .../collection/users/RetrieveUsersTest.java     |  8 +-
 .../collection/users/UserResourceIT.java        | 89 ++++++++++----------
 .../applications/events/EventsResourceIT.java   |  6 +-
 .../applications/queries/BasicGeoTests.java     |  4 +-
 .../applications/queries/GeoPagingTest.java     | 13 +--
 .../applications/queries/MatrixQueryTests.java  |  5 +-
 .../rest/applications/queries/OrderByTest.java  |  6 +-
 .../applications/queries/QueryTestBase.java     |  2 +-
 .../queries/SelectMappingsQueryTest.java        | 14 +--
 .../usergrid/rest/management/AccessTokenIT.java |  8 +-
 .../usergrid/rest/management/AdminUsersIT.java  | 33 ++++----
 .../rest/management/ExportResourceIT.java       |  2 +-
 .../rest/management/ImportResourceIT.java       | 14 +--
 .../rest/management/ManagementResourceIT.java   | 18 ++--
 .../rest/management/OrganizationsIT.java        | 16 ++--
 .../rest/management/RegistrationIT.java         |  4 +-
 .../rest/test/resource/AbstractRestIT.java      | 12 ++-
 .../resources/usergrid-custom-test.properties   |  7 +-
 .../services/notifications/QueueListener.java   | 15 ++--
 .../services/notifications/gcm/GCMAdapter.java  |  3 +-
 .../org/apache/usergrid/ServiceApplication.java |  2 +-
 .../apache/usergrid/management/EmailFlowIT.java |  2 +-
 .../org/apache/usergrid/management/RoleIT.java  |  5 +-
 .../usergrid/services/CollectionServiceIT.java  |  2 +-
 .../usergrid/services/GroupServiceIT.java       |  5 +-
 .../usergrid/services/ServiceInvocationIT.java  |  6 +-
 .../AbstractServiceNotificationIT.java          | 11 +--
 .../apns/NotificationsServiceIT.java            | 40 +++++----
 .../gcm/NotificationsServiceIT.java             | 15 ++--
 .../resources/usergrid-custom-test.properties   | 15 ++--
 86 files changed, 605 insertions(+), 596 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index 1071842..cdb4fc7 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -3089,52 +3089,16 @@ public class CpEntityManager implements EntityManager {
         managerCache.getEntityIndex(applicationScope).addIndex(newIndexName, shards, replicas, writeConsistency);
     }
 
+
     @Override
     public void initializeIndex(){
         managerCache.getEntityIndex(applicationScope).initialize();
     }
-    /**
-     * TODO, these 3 methods are super janky.  During refactoring we should clean this model up
-     */
-    public EntityIndex.IndexRefreshCommandInfo refreshIndex() {
-        try {
-            long start = System.currentTimeMillis();
-            // refresh special indexes without calling EntityManager refresh because stack overflow
-            Map<String, Object> map = new org.apache.usergrid.persistence.index.utils.MapUtils.HashMapBuilder<>();
-            map.put("some prop", "test");
-            boolean hasFinished = false;
-            Entity refreshEntity = create("refresh", map);
-            EntityIndex.IndexRefreshCommandInfo indexRefreshCommandInfo
-                = managerCache.getEntityIndex(applicationScope).refreshAsync().toBlocking().first();
 
-            try {
-                for (int i = 0; i < 20; i++) {
-                    if (searchCollection(
-                        new SimpleEntityRef(
-                            org.apache.usergrid.persistence.entities.Application.ENTITY_TYPE, getApplicationId()),
-                        InflectionUtils.pluralize("refresh"),
-                        Query.fromQL("select * where uuid='" + refreshEntity.getUuid() + "'")
-                    ).size() > 0
-                        ) {
-                        hasFinished = true;
-                        break;
-                    }
-                    int sleepTime = 500;
-                    logger.info("Sleeping {} ms during refreshIndex", sleepTime);
-                    Thread.sleep(sleepTime);
 
-                    indexRefreshCommandInfo
-                        = managerCache.getEntityIndex(applicationScope).refreshAsync().toBlocking().first();
-                }
-                if(!hasFinished){
-                    throw new RuntimeException("Did not find entity {} during refresh. uuid->"+refreshEntity.getUuid());
-                }
-            }finally {
-                delete(refreshEntity);
-            }
-            Thread.sleep(100);
-
-            return indexRefreshCommandInfo;
+    public EntityIndex.IndexRefreshCommandInfo refreshIndex() {
+        try {
+            return managerCache.getEntityIndex(applicationScope).refreshAsync().toBlocking().first();
         } catch (Exception e) {
             throw new RuntimeException("refresh failed",e);
         }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
index 81960f5..2ba6c0b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
@@ -36,6 +36,7 @@ import com.google.inject.Inject;
 import com.google.inject.Provider;
 import com.google.inject.Singleton;
 
+import static org.apache.usergrid.persistence.queue.LegacyQueueManager.Implementation.DISTRIBUTED;
 import static org.apache.usergrid.persistence.queue.LegacyQueueManager.Implementation.LOCAL;
 
 
@@ -121,6 +122,10 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> {
             asyncEventService.MAX_TAKE = 1000;
         }
 
+        if ( impl.equals( DISTRIBUTED )) {
+            asyncEventService.MAX_TAKE = 500;
+        }
+
         return asyncEventService;
     }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/Application.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/Application.java b/stack/core/src/test/java/org/apache/usergrid/Application.java
index 378a4f7..102ee9c 100644
--- a/stack/core/src/test/java/org/apache/usergrid/Application.java
+++ b/stack/core/src/test/java/org/apache/usergrid/Application.java
@@ -152,7 +152,9 @@ public interface Application extends TestRule {
 
     public void remove( EntityRef entityRef ) throws Exception;
 
-    public void refreshIndex();
+    public void waitForQueueDrainAndRefreshIndex(int waitTimeMillis);
+
+    public void waitForQueueDrainAndRefreshIndex();
 
     /**
      * Get the entity manager

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/CoreApplication.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/CoreApplication.java b/stack/core/src/test/java/org/apache/usergrid/CoreApplication.java
index 9046f02..f505ead 100644
--- a/stack/core/src/test/java/org/apache/usergrid/CoreApplication.java
+++ b/stack/core/src/test/java/org/apache/usergrid/CoreApplication.java
@@ -181,8 +181,8 @@ public class CoreApplication implements Application, TestRule {
 
         logger.info( "Created new application {} in organization {}", appName, orgName );
 
-//        //wait for the index before proceeding
-//        em.refreshIndex();
+        //wait for the index before proceeding
+        waitForQueueDrainAndRefreshIndex(500);
 
     }
 
@@ -223,19 +223,21 @@ public class CoreApplication implements Application, TestRule {
         return em.get( new SimpleEntityRef( type, id ) );
     }
 
-
     @Override
-    public synchronized void refreshIndex() {
-        //Insert test entity and find it
-        setup.getEmf().refreshIndex(CpNamingUtils.getManagementApplicationId().getUuid());
-
-        if (!em.getApplicationId().equals(CpNamingUtils.getManagementApplicationId().getUuid())) {
-            setup.getEmf().refreshIndex(em.getApplicationId());
+    public synchronized void waitForQueueDrainAndRefreshIndex(int waitTimeMillis) {
+        try{
+            Thread.sleep(waitTimeMillis);
+        } catch (InterruptedException e ){
+            //noop
         }
-
         em.refreshIndex();
     }
 
+    @Override
+    public synchronized void waitForQueueDrainAndRefreshIndex() {
+        waitForQueueDrainAndRefreshIndex(750);
+    }
+
 
     @Override
     public EntityManager getEntityManager() {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/CoreITSetupImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/CoreITSetupImpl.java b/stack/core/src/test/java/org/apache/usergrid/CoreITSetupImpl.java
index 64b001c..bd6ae3e 100644
--- a/stack/core/src/test/java/org/apache/usergrid/CoreITSetupImpl.java
+++ b/stack/core/src/test/java/org/apache/usergrid/CoreITSetupImpl.java
@@ -156,16 +156,28 @@ public class CoreITSetupImpl implements CoreITSetup, TestEntityIndex {
     @Override
     public void refresh(UUID appId){
         try {
-            Thread.sleep(50);
-        } catch (InterruptedException ie){
 
+            Thread.sleep(125);
+
+        } catch (InterruptedException ie){
+            //noop
         }
+
         emf.refreshIndex(appId);
 
+    }
+
+    @Override
+    public void waitForQueueDrainAndRefresh(UUID appId, int waitTimeMillis){
         try {
-            Thread.sleep(50);
-        } catch (InterruptedException ie){
 
+            Thread.sleep(waitTimeMillis);
+
+        } catch (InterruptedException ie){
+            //noop
         }
+
+        emf.refreshIndex(appId);
+
     }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/TestEntityIndex.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/TestEntityIndex.java b/stack/core/src/test/java/org/apache/usergrid/TestEntityIndex.java
index 7da187a..e5e979e 100644
--- a/stack/core/src/test/java/org/apache/usergrid/TestEntityIndex.java
+++ b/stack/core/src/test/java/org/apache/usergrid/TestEntityIndex.java
@@ -26,4 +26,5 @@ import java.util.UUID;
  */
 public interface TestEntityIndex {
     void refresh(UUID appId);
+    void waitForQueueDrainAndRefresh(UUID appId, int waitTimeMillis);
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/corepersistence/AggregationServiceTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/AggregationServiceTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/AggregationServiceTest.java
index 9f1c9a4..55ce26e 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/AggregationServiceTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/AggregationServiceTest.java
@@ -48,8 +48,8 @@ public class AggregationServiceTest extends AbstractCoreIT {
         props.put("name", "myname");
         Entity entity1 = this.app.getEntityManager().create("test", props);
         Entity entity2 = this.app.getEntityManager().create("test2", props);
-        this.app.refreshIndex();
-        Thread.sleep(500);
+
+        this.app.waitForQueueDrainAndRefreshIndex(500);
 
         long sum = aggregationService.getApplicationSize(applicationScope);
 
@@ -57,23 +57,24 @@ public class AggregationServiceTest extends AbstractCoreIT {
         Assert.assertTrue(sum > (entity1.getSize() + entity2.getSize()));
 
         long sum1 = aggregationService.getSize(applicationScope, CpNamingUtils.createCollectionSearchEdge(applicationScope.getApplication(), "tests"));
-        Assert.assertEquals(sum1, entity1.getSize());
+        Assert.assertEquals(entity1.getSize(), sum1);
 
         long sum2 = aggregationService.getSize(applicationScope, CpNamingUtils.createCollectionSearchEdge(applicationScope.getApplication(), "test2s"));
-        Assert.assertEquals(sum2, entity2.getSize());
+        Assert.assertEquals(entity2.getSize(), sum2);
 
         props = new HashMap<>();
         props.put("test", 1234);
         props.put("name", "myname2");
         Entity entity3 = this.app.getEntityManager().create("test", props);
 
-        this.app.refreshIndex();
+        this.app.waitForQueueDrainAndRefreshIndex(500);
+
         long sum3 = aggregationService.getSize(applicationScope, CpNamingUtils.createCollectionSearchEdge(applicationScope.getApplication(), "tests"));
-        Assert.assertEquals(sum3, entity1.getSize() + entity3.getSize());
+        Assert.assertEquals(entity1.getSize() + entity3.getSize(), sum3);
 
         Map<String,Long> sumEach = aggregationService.getEachCollectionSize(applicationScope);
         Assert.assertTrue(sumEach.containsKey("tests") && sumEach.containsKey("test2s"));
-        Assert.assertEquals(sum3, (long) sumEach.get("tests"));
+        Assert.assertEquals((long) sumEach.get("tests"), sum3);
 
     }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
index abe2615..0abd7a2 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
@@ -20,7 +20,6 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.UUID;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -105,7 +104,7 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
         Entity thing = em.create("thing", new HashMap<String, Object>() {{
             put("name", "thing1");
         }});
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         Thread.sleep(1000);
         assertEquals(1, queryCollectionCp("things", "thing", "select *").size());
@@ -116,7 +115,7 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
         em.updateProperties(thing, new HashMap<String, Object>() {{
             put("stuff", "widget");
         }});
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
         Thread.sleep(1000);
 
         org.apache.usergrid.persistence.model.entity.Entity cpUpdated = getCpEntity(thing);
@@ -161,7 +160,7 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
             }}));
             Thread.sleep( writeDelayMs );
         }
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         CandidateResults crs = queryCollectionCp( "things", "thing", "select *");
         Assert.assertEquals( "Expect no stale candidates yet", numEntities, crs.size() );
@@ -210,7 +209,7 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
         Thread.sleep(250); // delete happens asynchronously, wait for some time
 
         //refresh the app index
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         Thread.sleep(250); // refresh happens asynchronously, wait for some time
 
@@ -231,7 +230,7 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
                }
             });
             //refresh the app index
-            app.refreshIndex();
+            app.waitForQueueDrainAndRefreshIndex();
 
             crs = queryCollectionCp("things", "thing", "select *");
 
@@ -265,7 +264,7 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
                 put("name", dogName);
             }}));
         }
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         CandidateResults crs = queryCollectionCp( "dogs", "dog", "select *");
         Assert.assertEquals("Expect no stale candidates yet", numEntities, crs.size());
@@ -288,7 +287,7 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
             }
 
         }
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         // wait for indexes to be cleared for the deleted entities
         count = 0;
@@ -296,7 +295,7 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
         do {
             //trigger the repair
             queryCollectionEm("dogs", "select * order by created");
-            app.refreshIndex();
+            app.waitForQueueDrainAndRefreshIndex();
             crs = queryCollectionCp("dogs", "dog", "select *");
         } while ( crs.size() != numEntities && count++ < 15 );
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/persistence/ApplicationServiceIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/ApplicationServiceIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/ApplicationServiceIT.java
index 9ad90eb..547691f 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/ApplicationServiceIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/ApplicationServiceIT.java
@@ -23,7 +23,6 @@ import com.google.common.base.Optional;
 import com.google.inject.Injector;
 import org.apache.usergrid.AbstractCoreIT;
 import org.apache.usergrid.cassandra.SpringResource;
-import org.apache.usergrid.corepersistence.service.AggregationService;
 import org.apache.usergrid.corepersistence.service.AggregationServiceFactory;
 import org.apache.usergrid.corepersistence.util.CpNamingUtils;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
@@ -32,7 +31,6 @@ import org.apache.usergrid.persistence.graph.GraphManager;
 import org.apache.usergrid.persistence.graph.GraphManagerFactory;
 import org.apache.usergrid.persistence.graph.MarkedEdge;
 import org.apache.usergrid.persistence.graph.SearchByEdgeType;
-import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.junit.Assert;
@@ -65,7 +63,7 @@ public class ApplicationServiceIT extends AbstractCoreIT {
             map.put("somekey", UUID.randomUUID());
            Entity entity = entityManager.create("tests", map);
         }
-        this.app.refreshIndex();
+        this.app.waitForQueueDrainAndRefreshIndex();
         Thread.sleep(500);
         ApplicationScope appScope  = CpNamingUtils.getApplicationScope(entityManager.getApplicationId());
         Observable<Id> ids =
@@ -76,7 +74,7 @@ public class ApplicationServiceIT extends AbstractCoreIT {
             this.app.getApplicationService().deleteAllEntities(appScope, 5);
         count = ids.count().toBlocking().last();
         Assert.assertEquals(count, 5);
-        this.app.refreshIndex();
+        this.app.waitForQueueDrainAndRefreshIndex();
         Injector injector = SpringResource.getInstance().getBean(Injector.class);
         GraphManagerFactory factory = injector.getInstance(GraphManagerFactory.class);
         GraphManager graphManager = factory.createEdgeManager(appScope);

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java
index 3305e0e..f484f4f 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java
@@ -132,7 +132,7 @@ public class CollectionIT extends AbstractCoreIT {
         activity3 = app.get( activity3.getUuid(), activity3.getType() );
         app.addToCollection( user, "activities", activity3 );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         // empty query
         Query query = new Query();
@@ -259,7 +259,7 @@ public class CollectionIT extends AbstractCoreIT {
         activity3 = app.get(activity3.getUuid(), activity3.getType());
         app.addToCollection(user, "activities", activity3);
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         // empty query
         Query query = new Query();
@@ -295,7 +295,7 @@ public class CollectionIT extends AbstractCoreIT {
         Entity user = em.create( "user", properties );
         assertNotNull( user );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         // EntityRef
         Query query = Query.fromQL( "firstname = '" + firstName + "'" );
@@ -315,7 +315,7 @@ public class CollectionIT extends AbstractCoreIT {
 
         em.update( user );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         // search with the old username, should be no results
         query = Query.fromQL( "firstname = '" + firstName + "'" );
@@ -354,7 +354,7 @@ public class CollectionIT extends AbstractCoreIT {
         Entity user = em.create( "user", properties );
         assertNotNull( user );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         // EntityRef
         final Query query = Query.fromQL( "middlename = '" + middleName + "'" );
@@ -386,7 +386,7 @@ public class CollectionIT extends AbstractCoreIT {
         Entity user = em.create( "user", properties );
         assertNotNull( user );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         // EntityRef
         final Query query = Query.fromQL( "lastname = '" + lastName + "'" );
@@ -434,7 +434,7 @@ public class CollectionIT extends AbstractCoreIT {
         properties.put("nickname", "ed");
         em.updateProperties(user1, properties);
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
         Thread.sleep(1000);
 
         final Query query = Query.fromQL( "nickname = 'ed'" );
@@ -469,7 +469,7 @@ public class CollectionIT extends AbstractCoreIT {
         Entity group = em.create( "group", properties );
         assertNotNull( group );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         // EntityRef
         final Query query = Query.fromQL( "name = '" + groupName + "'" );
@@ -501,7 +501,7 @@ public class CollectionIT extends AbstractCoreIT {
         Entity group = em.create( "group", properties );
         assertNotNull( group );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         // EntityRef
 
@@ -559,7 +559,7 @@ public class CollectionIT extends AbstractCoreIT {
 
         em.addToCollection( user, "activities", em.create( "activity", properties ) );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         final Query query = Query.fromQL( "verb = 'post'" );
 
@@ -593,7 +593,7 @@ public class CollectionIT extends AbstractCoreIT {
         Entity user2 = em.create( "user", properties );
         assertNotNull( user2 );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         // EntityRef
         Query query = new Query();
@@ -636,7 +636,7 @@ public class CollectionIT extends AbstractCoreIT {
         Entity user2 = em.create( "user", properties );
         assertNotNull( user2 );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         // EntityRef
         Query query = new Query();
@@ -677,7 +677,7 @@ public class CollectionIT extends AbstractCoreIT {
         Entity game2 = em.create( "orquerygame", properties );
         assertNotNull( game2 );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         // EntityRef
         Query query = Query
@@ -756,7 +756,7 @@ public class CollectionIT extends AbstractCoreIT {
         Entity game2 = em.create( "game", properties );
         assertNotNull( game2 );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         // overlap
         Query query = Query.fromQL(
@@ -817,7 +817,7 @@ public class CollectionIT extends AbstractCoreIT {
         Entity game2 = em.create( "game", properties );
         assertNotNull( game2 );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         // simple not
         Query query = Query.fromQL( "select * where NOT keywords contains 'game'" );
@@ -893,7 +893,7 @@ public class CollectionIT extends AbstractCoreIT {
         Entity entity2 = em.create( "game", properties );
         assertNotNull( entity2 );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
 
         // search for games without sub-field Foo should returned zero entities
@@ -949,7 +949,7 @@ public class CollectionIT extends AbstractCoreIT {
         properties.put("keywords", "Action, New");
         em.create( "game", properties );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         Query query = Query.fromQL( "select * where keywords contains 'hot' or title contains 'hot'" );
         Results r = em.searchCollection( em.getApplicationRef(), "games", query );
@@ -980,7 +980,7 @@ public class CollectionIT extends AbstractCoreIT {
         properties.put( "keywords", "Action, New" );
         Entity thirdGame = em.create( "game", properties );
 
-        app.refreshIndex();//need to track all batches then resolve promises
+        app.waitForQueueDrainAndRefreshIndex();//need to track all batches then resolve promises
 
         Query query = Query.fromQL( "select * where keywords contains 'new' and title contains 'extreme'" );
         Results r = em.searchCollection( em.getApplicationRef(), "games", query );
@@ -1011,7 +1011,7 @@ public class CollectionIT extends AbstractCoreIT {
             entityIds.add( created.getUuid() );
         }
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         Query query = new Query();
         query.setLimit( 50 );
@@ -1039,7 +1039,7 @@ public class CollectionIT extends AbstractCoreIT {
             numDeleted++;
         }
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         // wait for indexes to be cleared
         Thread.sleep(1000); //TODO find why we have to wait.  This is a bug
@@ -1096,7 +1096,7 @@ public class CollectionIT extends AbstractCoreIT {
 
         int pageSize = 10;
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
         final Query query = Query.fromQL( "index < " + size * 2 + " order by index asc" );
 
         Results r = null;
@@ -1147,7 +1147,7 @@ public class CollectionIT extends AbstractCoreIT {
 
         int pageSize = 10;
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         Query query = Query.fromQL( "select * where index >= " + size / 2 + " sort by index asc" );
         query.setLimit( pageSize );
@@ -1201,7 +1201,7 @@ public class CollectionIT extends AbstractCoreIT {
             entityIds.add( created.getUuid() );
         }
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         int pageSize = 10;
 
@@ -1254,7 +1254,7 @@ public class CollectionIT extends AbstractCoreIT {
             entityIds.add( created.getUuid() );
         }
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         int pageSize = 5;
 
@@ -1310,7 +1310,7 @@ public class CollectionIT extends AbstractCoreIT {
 
         Entity saved = em.create( "test", root );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         Query query = Query.fromQL( "rootprop1 = 'simpleprop'" );
         Entity entity;
@@ -1357,7 +1357,7 @@ public class CollectionIT extends AbstractCoreIT {
 
         Entity saved = em.create( "test", jsonData );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         Query query = Query.fromQL( "intprop = 10" );
 
@@ -1416,7 +1416,7 @@ public class CollectionIT extends AbstractCoreIT {
 
         Entity saved = em.create( "test", props );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         Query query = Query.fromQL( "myString = 'My simple string'" );
 
@@ -1441,7 +1441,7 @@ public class CollectionIT extends AbstractCoreIT {
 
         em.create( "user", properties );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         String s = "select username, email where username = 'edanuff'";
         Query query = Query.fromQL( s );
@@ -1471,7 +1471,7 @@ public class CollectionIT extends AbstractCoreIT {
 
         em.create( "user", properties );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         String s = "select {name: username, email: email} where username = 'edanuff'";
         Query query = Query.fromQL( s );
@@ -1503,7 +1503,7 @@ public class CollectionIT extends AbstractCoreIT {
 
         final Entity entity = em.create( "user", properties );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         String s = "select * where username = 'ed@anuff.com'";
         Query query = Query.fromQL( s );
@@ -1525,7 +1525,7 @@ public class CollectionIT extends AbstractCoreIT {
 
         em.createConnection( foo, "testconnection", entity );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         // now query via the testConnection, this should work
 
@@ -1569,7 +1569,7 @@ public class CollectionIT extends AbstractCoreIT {
 
         em.create( "loveobject", properties );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         location = new LinkedHashMap<String, Object>();
         location.put( "Place", "Via Pietro Maroncelli, 48, 62012 Santa Maria Apparente Province of Macerata, Italy" );
@@ -1587,7 +1587,7 @@ public class CollectionIT extends AbstractCoreIT {
 
         em.create( "loveobject", properties );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         // String s = "select * where Flag = 'requested'";
         // String s = "select * where Flag = 'requested' and NOT Recipient.Username =
@@ -1632,7 +1632,7 @@ public class CollectionIT extends AbstractCoreIT {
             createdEntities.add( created );
         }
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         Results r = em.getCollection( em.getApplicationRef(), "users", null, 50, Level.ALL_PROPERTIES, false );
 
@@ -1729,7 +1729,7 @@ public class CollectionIT extends AbstractCoreIT {
         Entity game2 = em.create( "game", properties );
         assertNotNull( game2 );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         // overlap
         Query query = new Query();
@@ -1763,7 +1763,7 @@ public class CollectionIT extends AbstractCoreIT {
         Entity game2 = em.create( "game", properties );
         assertNotNull( game2 );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         // overlap
         Query query = new Query();
@@ -1797,7 +1797,7 @@ public class CollectionIT extends AbstractCoreIT {
         Entity createUser2 = em.create( user2 );
         assertNotNull( createUser2 );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         // overlap
         Query query = new Query();

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/persistence/CountingMutatorIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/CountingMutatorIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/CountingMutatorIT.java
index 63c7cb8..596ec7c 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/CountingMutatorIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/CountingMutatorIT.java
@@ -74,7 +74,7 @@ public class CountingMutatorIT extends AbstractCoreIT {
         properties.put( "username", "testuser" );
         properties.put( "email", "test@foo.bar" );
         Entity created = em.create( "user", properties );
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         Entity returned = em.get( created.getUuid() );
 
@@ -89,7 +89,7 @@ public class CountingMutatorIT extends AbstractCoreIT {
 
 
             Entity connectedEntity = em.create( "user", connectedProps );
-            app.refreshIndex();
+            app.waitForQueueDrainAndRefreshIndex();
 
             // Connect from our new entity to our root one so it's updated when paging
             em.createConnection( connectedEntity, "following", returned );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/persistence/EntityConnectionsIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/EntityConnectionsIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/EntityConnectionsIT.java
index 296bf53..e1e24c4 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/EntityConnectionsIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/EntityConnectionsIT.java
@@ -64,7 +64,7 @@ public class EntityConnectionsIT extends AbstractCoreIT {
         assertEquals( 1, connectionTypes.size());
         assertEquals("likes", connectionTypes.iterator().next());
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         Results r = em.getTargetEntities(firstUserEntity, "likes", null, Level.IDS);
 
@@ -128,7 +128,7 @@ public class EntityConnectionsIT extends AbstractCoreIT {
         logger.info( "\n\nConnecting " + awardA.getUuid() + " \"awarded\" " + catB.getUuid() + "\n" );
         em.createConnection( awardA, "awarded", catB );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         // List forward connections for cat A
 
@@ -149,7 +149,7 @@ public class EntityConnectionsIT extends AbstractCoreIT {
         logger.info( "\n\nConnecting " + awardA.getUuid() + " \"awarded\" " + catA.getUuid() + "\n" );
         em.createConnection( awardA, "awarded", catA );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         // List forward connections for cat A
 // Not valid with current usages
@@ -256,7 +256,7 @@ public class EntityConnectionsIT extends AbstractCoreIT {
 
         em.createConnection( secondUserEntity, "likes", arrogantbutcher );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         Results r = em.getTargetEntities(firstUserEntity, "likes", "restaurant", Level.IDS);
 
@@ -310,7 +310,7 @@ public class EntityConnectionsIT extends AbstractCoreIT {
 
         em.createConnection( fredEntity, "likes", wilmaEntity );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
 //        // search for "likes" edges from fred
 //        assertEquals( 1,
@@ -363,7 +363,7 @@ public class EntityConnectionsIT extends AbstractCoreIT {
         em.createConnection( fredEntity, "likes", JohnEntity );
 
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         // now query via the testConnection, this should work
 
@@ -410,7 +410,7 @@ public class EntityConnectionsIT extends AbstractCoreIT {
         }
 
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         Results r = em.getTargetEntities(firstUserEntity, "likes", null, Level.ALL_PROPERTIES) ;
 
@@ -453,7 +453,7 @@ public class EntityConnectionsIT extends AbstractCoreIT {
 //
 //        em.createConnection( fredEntity, "likes", wilmaEntity );
 //
-//        app.refreshIndex();
+//        app.waitForQueueDrainAndRefreshIndex();
 //
 ////        // search for "likes" edges from fred
 ////        assertEquals( 1,

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/persistence/EntityManagerIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/EntityManagerIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/EntityManagerIT.java
index cb3a728..e1e4a05 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/EntityManagerIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/EntityManagerIT.java
@@ -75,7 +75,7 @@ public class EntityManagerIT extends AbstractCoreIT {
         assertEquals( "user.username not expected value", "edanuff", user.getProperty( "username" ) );
         assertEquals( "user.email not expected value", "ed@anuff.com", user.getProperty( "email" ) );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         EntityRef userRef = em.getAlias( new SimpleEntityRef( "application", applicationId ), "users", "edanuff" );
 
@@ -274,13 +274,13 @@ public class EntityManagerIT extends AbstractCoreIT {
         Entity thing = em.create( "thing", properties );
         logger.info( "Entity created" );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         logger.info( "Starting entity delete" );
         em.delete( thing );
         logger.info( "Entity deleted" );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         // now search by username, no results should be returned
 
@@ -310,13 +310,13 @@ public class EntityManagerIT extends AbstractCoreIT {
         Entity user = em.create( "user", properties );
         logger.info( "Entity created" );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         logger.info( "Starting entity delete" );
         em.delete( user );
         logger.info( "Entity deleted" );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         // now search by username, no results should be returned
 
@@ -335,7 +335,7 @@ public class EntityManagerIT extends AbstractCoreIT {
         user = em.create( "user", properties );
         logger.info( "Entity created" );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         final Query userNameQuery = Query.fromQL( "username = '" + name + "'" );
 
@@ -456,7 +456,7 @@ public class EntityManagerIT extends AbstractCoreIT {
 
         EntityRef appRef = em.get( new SimpleEntityRef( "application", app.getId() ) );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         Results r = em.getCollection( appRef, "things", null, 10, Level.ALL_PROPERTIES, false );
 
@@ -548,7 +548,7 @@ public class EntityManagerIT extends AbstractCoreIT {
 
         Entity createdDevice = em.createItemInCollection( createdUser, "devices", "device", device );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         Entity returnedDevice = em.get( new SimpleEntityRef( "device", createdDevice.getUuid() ) );
 
@@ -580,7 +580,7 @@ public class EntityManagerIT extends AbstractCoreIT {
         Entity user = em.create( "robot", properties );
         assertNotNull( user );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         assertNotNull( em.get( user.getUuid() ) );
     }
@@ -608,7 +608,7 @@ public class EntityManagerIT extends AbstractCoreIT {
         em.addToCollection(appRef, "fluffies", entityRef);
         em.addToCollection(appRef, "fluffies", entityRef);
 
-        //app.refreshIndex();
+        //app.waitForQueueDrainAndRefreshIndex();
 
         Results results = em.getCollection(appRef,
             "fluffies", null, 10, Level.ALL_PROPERTIES, true);

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/persistence/GeoIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/GeoIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/GeoIT.java
index b7d708e..df77084 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/GeoIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/GeoIT.java
@@ -96,7 +96,7 @@ public class GeoIT extends AbstractCoreIT {
         assertNotNull(hotel);
 
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         //2. Query with a globally large distance to verify location
 
@@ -142,7 +142,7 @@ public class GeoIT extends AbstractCoreIT {
         }};
         Entity user = em.create("user", properties);
         assertNotNull(user);
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         //2. Query with a globally large distance to verify location
         Query query = Query.fromQL("select * where location within " + Integer.MAX_VALUE + " of 0, 0");
@@ -154,7 +154,7 @@ public class GeoIT extends AbstractCoreIT {
         user.getDynamicProperties().remove("location");
         em.updateProperties(user, properties);
         em.update(user);
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         //4. Repeat the query, expecting no results
         listResults = em.searchCollection(em.getApplicationRef(), "users", query);
@@ -188,7 +188,7 @@ public class GeoIT extends AbstractCoreIT {
         }};
         Entity user = em.create("user", properties);
         assertNotNull(user);
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         final double lat = 37.776753;
         final double lon = -122.407846;
@@ -234,7 +234,7 @@ public class GeoIT extends AbstractCoreIT {
         }};
         Entity user = em.create("user", properties);
         assertNotNull(user);
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         final double lat = 37.776753;
         final double lon = -122.407846;
@@ -284,12 +284,12 @@ public class GeoIT extends AbstractCoreIT {
 
         Entity user = em.create("user", userProperties);
         assertNotNull(user);
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         //3. Create a connection between the user and the entity
         em.createConnection(user, "likes", restaurant);
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
         //4. Test that the user is within 2000m of the entity
         Results emSearchResults = em.searchTargetEntities(user,
             Query.fromQL("location within 5000 of "
@@ -326,7 +326,7 @@ public class GeoIT extends AbstractCoreIT {
             assertNotNull(entity);
             logger.debug("Entity {} created", entity.getProperty("name"));
         }
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
         //2. validate the size of the result
         Query query = new Query();
         Results listResults = em.searchCollection(em.getApplicationRef(), "stores", query);
@@ -367,7 +367,7 @@ public class GeoIT extends AbstractCoreIT {
             assertNotNull(entity);
             logger.debug("Entity {} created", entity.getProperty("name"));
         }
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
         //2. validate the size of the result
         Query query = new Query();
         Results listResults = em.searchCollection(em.getApplicationRef(), "stores", query);
@@ -540,7 +540,7 @@ public class GeoIT extends AbstractCoreIT {
             em.create("store", data);
         }
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
          // earth's circumference is 40,075 kilometers. Up it to 50,000kilometers
         // just to be save
@@ -596,7 +596,7 @@ public class GeoIT extends AbstractCoreIT {
             em.create("store", data);
         }
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
         Thread.sleep(2000);
 
         // earth's circumference is 40,075 kilometers. Up it to 50,000kilometers
@@ -669,7 +669,7 @@ public class GeoIT extends AbstractCoreIT {
             em.create("store", data);
         }
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
          // earth's circumference is 40,075 kilometers. Up it to 50,000kilometers
         // just to be save
@@ -729,7 +729,7 @@ public class GeoIT extends AbstractCoreIT {
             created.add(e);
         }
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         int startDelta = size - min;
 
@@ -794,7 +794,7 @@ public class GeoIT extends AbstractCoreIT {
             em.create("store", data);
         }
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         //do a direct geo iterator test.  We need to make sure that we short circuit on the correct tile.
 
@@ -838,7 +838,7 @@ public class GeoIT extends AbstractCoreIT {
             assertNotNull(entity);
         }
         //3. refresh the index
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
         //4. return the entity manager
         return em;
     }
@@ -857,7 +857,7 @@ public class GeoIT extends AbstractCoreIT {
         latlong.put("longitude", longitude);
 
         em.setProperty(entity, "location", latlong);
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
     }
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/persistence/GeoQueryBooleanTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/GeoQueryBooleanTest.java b/stack/core/src/test/java/org/apache/usergrid/persistence/GeoQueryBooleanTest.java
index 9a3f5a6..609f977 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/GeoQueryBooleanTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/GeoQueryBooleanTest.java
@@ -79,7 +79,7 @@ public class GeoQueryBooleanTest extends AbstractCoreIT {
         Entity user2 = em.create( "user", properties );
         assertNotNull( user2 );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         // define center point about 300m from that location
         final double lat = 37.774277;
@@ -158,7 +158,7 @@ public class GeoQueryBooleanTest extends AbstractCoreIT {
         Entity userFred = em.create( "user", properties );
         assertNotNull( userFred );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         // define center point about 300m from that location
         final double lat = 37.774277;

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/persistence/IndexIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/IndexIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/IndexIT.java
index d62f88e..5933b57 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/IndexIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/IndexIT.java
@@ -60,7 +60,7 @@ public class IndexIT extends AbstractCoreIT {
             em.create( "item", properties );
         }
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         int i = 0;
 
@@ -133,7 +133,7 @@ public class IndexIT extends AbstractCoreIT {
             em.create( "item", properties );
         }
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         Query query = Query.fromQL( "name < 'delta' order by name asc" );
         Results r = em.searchCollection( em.getApplicationRef(), "items", query );
@@ -261,7 +261,7 @@ public class IndexIT extends AbstractCoreIT {
             em.create( "item", properties );
         }
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         Query query = Query.fromQL( "group = 1 order by name desc" );
         Results r = em.searchCollection( em.getApplicationRef(), "items", query );
@@ -290,7 +290,7 @@ public class IndexIT extends AbstractCoreIT {
 
         em.create("names", entity1);
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         //should return valid values
         Query query = Query.fromQL("select status where status = 'pickled'");
@@ -338,7 +338,7 @@ public class IndexIT extends AbstractCoreIT {
 
         em.createConnection( entity2Ref, "connecting", entity1Ref );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         //should return valid values
         Query query = Query.fromQL( "select * where status = 'pickled'" );
@@ -357,7 +357,7 @@ public class IndexIT extends AbstractCoreIT {
 
         em.update( entity1Ref );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         //query and check the status has been updated, shouldn't return results
         query = Query.fromQL( "select * where status = 'pickled'" );
@@ -413,7 +413,7 @@ public class IndexIT extends AbstractCoreIT {
 
         em.createConnection( entity2Ref, "connecting", entity1Ref );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         //should return valid values
         Query query = Query.fromQL( "select * where status = 'pickled'" );
@@ -432,7 +432,7 @@ public class IndexIT extends AbstractCoreIT {
 
         em.update( entity1Ref );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         //query and check the status has been updated, shouldn't return results
         query = Query.fromQL( "select * where status = 'pickled'" );
@@ -500,7 +500,7 @@ public class IndexIT extends AbstractCoreIT {
         }};
         em.create("names", entity2);
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         // simple single-field select mapping
         {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/persistence/PathQueryIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/PathQueryIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/PathQueryIT.java
index e6ecf97..329a5be 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/PathQueryIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/PathQueryIT.java
@@ -28,7 +28,6 @@ import java.util.UUID;
 import org.junit.Test;
 
 import org.apache.usergrid.AbstractCoreIT;
-import org.apache.usergrid.persistence.Query;
 import org.apache.usergrid.persistence.Query.Level;
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 
@@ -63,7 +62,7 @@ public class PathQueryIT extends AbstractCoreIT {
             }
         }
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         Thread.sleep(1000);
 
@@ -135,7 +134,7 @@ public class PathQueryIT extends AbstractCoreIT {
             }
         }
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         // pick an arbitrary group, ensure it has 7 users
         Results ru = em.getCollection( groups.get( 2 ), "users", null, 20, Level.IDS, false );
@@ -152,7 +151,7 @@ public class PathQueryIT extends AbstractCoreIT {
             }
         }
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         // pick an arbitrary user, ensure it has 7 devices
         Results rd = em.getCollection( users.get( 6 ), "devices", null, 20, Level.IDS, false );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/persistence/PermissionsIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/PermissionsIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/PermissionsIT.java
index 11f0692..1072d29 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/PermissionsIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/PermissionsIT.java
@@ -28,8 +28,6 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.commons.lang3.RandomStringUtils;
-
 import org.apache.usergrid.AbstractCoreIT;
 import org.apache.usergrid.persistence.entities.Role;
 import org.apache.usergrid.persistence.Query.Level;
@@ -147,7 +145,7 @@ public class PermissionsIT extends AbstractCoreIT {
         dump( "group roles", roles );
 
         em.deleteGroupRole( group.getUuid(), "author" );
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
         Thread.sleep(1000);
 
         roles = em.getGroupRoles( group.getUuid() );
@@ -156,7 +154,7 @@ public class PermissionsIT extends AbstractCoreIT {
 
         em.addUserToGroupRole( user.getUuid(), group.getUuid(), "admin" );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
         Results r = em.getUsersInGroupRole( group.getUuid(), "admin", Level.ALL_PROPERTIES );
         dump( "entities", r.getEntities() );
         assertEquals( "proper number of users in group role not set", 1, r.size() );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/persistence/RebuildIndexTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/RebuildIndexTest.java b/stack/core/src/test/java/org/apache/usergrid/persistence/RebuildIndexTest.java
index 383d620..a7759de 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/RebuildIndexTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/RebuildIndexTest.java
@@ -125,7 +125,7 @@ public class RebuildIndexTest extends AbstractCoreIT {
         }
 
         logger.info( "Created {} entities", ENTITIES_TO_INDEX );
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex(5000);
 
         // ----------------- test that we can read them, should work fine
 
@@ -163,6 +163,7 @@ public class RebuildIndexTest extends AbstractCoreIT {
 
         waitForRebuild( status, reIndexService );
 
+        app.waitForQueueDrainAndRefreshIndex(5000);
 
         // ----------------- test that we can read the catherder collection and not the catshepard
 
@@ -233,7 +234,7 @@ public class RebuildIndexTest extends AbstractCoreIT {
         }
 
         logger.info( "Created {} entities", ENTITIES_TO_INDEX );
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex(15000);
 
         // ----------------- test that we can read them, should work fine
 
@@ -247,6 +248,8 @@ public class RebuildIndexTest extends AbstractCoreIT {
 
         deleteIndex( em.getApplicationId() );
 
+        app.waitForQueueDrainAndRefreshIndex();
+
         // ----------------- test that we can read them, should fail
 
         // deleting sytem app index will interfere with other concurrently running tests
@@ -283,7 +286,6 @@ public class RebuildIndexTest extends AbstractCoreIT {
 
             logger.info( "Rebuilt index" );
 
-            app.refreshIndex();
         }
         catch ( Exception ex ) {
             logger.error( "Error rebuilding index", ex );
@@ -292,7 +294,7 @@ public class RebuildIndexTest extends AbstractCoreIT {
 
         // ----------------- test that we can read them
 
-        Thread.sleep( 2000 );
+        app.waitForQueueDrainAndRefreshIndex(15000);
         readData( em, collectionName, ENTITIES_TO_INDEX, 3 );
     }
 
@@ -343,7 +345,7 @@ public class RebuildIndexTest extends AbstractCoreIT {
 
 
         logger.info( "Created {} entities", ENTITIES_TO_INDEX );
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex(5000);
 
         // ----------------- test that we can read them, should work fine
 
@@ -392,7 +394,6 @@ public class RebuildIndexTest extends AbstractCoreIT {
 
             logger.info( "Rebuilt index" );
 
-            app.refreshIndex();
         }
         catch ( Exception ex ) {
             logger.error( "Error rebuilding index", ex );
@@ -401,7 +402,7 @@ public class RebuildIndexTest extends AbstractCoreIT {
 
         // ----------------- test that we can read them
 
-        Thread.sleep( 2000 );
+        app.waitForQueueDrainAndRefreshIndex(5000);
         results = em.searchCollectionConsistent( em.getApplicationRef(), collectionName, q, 3 );
         assertEquals(results.size(),3);
         q = Query.fromQL("select * where location within 100 of "+lat+", "+lon);
@@ -435,7 +436,7 @@ public class RebuildIndexTest extends AbstractCoreIT {
 
         final Entity secondEntity = em.create( "thing",  entityData);
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex(5000);
 
         // ----------------- test that we can read them, should work fine
 
@@ -493,7 +494,6 @@ public class RebuildIndexTest extends AbstractCoreIT {
 
             logger.info( "Rebuilt index" );
 
-            app.refreshIndex();
         }
         catch ( Exception ex ) {
             logger.error( "Error rebuilding index", ex );
@@ -502,7 +502,7 @@ public class RebuildIndexTest extends AbstractCoreIT {
 
         // ----------------- test that we can read them
 
-        Thread.sleep( 2000 );
+        app.waitForQueueDrainAndRefreshIndex(5000);
         countEntities( em, collectionName, 1 );
     }
 
@@ -547,14 +547,14 @@ public class RebuildIndexTest extends AbstractCoreIT {
         );
 
         ei.deleteApplication().toBlocking().lastOrDefault( null );
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
     }
 
 
     private int readData( EntityManager em, String collectionName, int expectedEntities, int expectedConnections )
         throws Exception {
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         Query q = Query.fromQL( "select * where key1=1000" ).withLimit( 1000 );
         Results results = em.searchCollectionConsistent( em.getApplicationRef(), collectionName, q, expectedEntities );
@@ -593,7 +593,7 @@ public class RebuildIndexTest extends AbstractCoreIT {
     private int countEntities( EntityManager em, String collectionName, int expectedEntities)
            throws Exception {
 
-           app.refreshIndex();
+           app.waitForQueueDrainAndRefreshIndex();
 
            Query q = Query.fromQL( "select * where key1=1000" ).withLimit( 1000 );
            Results results = em.searchCollectionConsistent( em.getApplicationRef(), collectionName, q, expectedEntities );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImplIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImplIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImplIT.java
index d287d7e..3652b6f 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImplIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImplIT.java
@@ -26,11 +26,8 @@ import java.util.UUID;
 import org.apache.usergrid.Application;
 import org.apache.usergrid.CoreApplication;
 import org.apache.usergrid.corepersistence.index.ReIndexRequestBuilder;
-import org.apache.usergrid.corepersistence.index.ReIndexRequestBuilderImpl;
 import org.apache.usergrid.corepersistence.index.ReIndexService;
-import org.apache.usergrid.corepersistence.index.ReIndexServiceImpl;
 import org.apache.usergrid.persistence.*;
-import org.apache.usergrid.utils.UUIDUtils;
 import org.junit.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -43,8 +40,6 @@ import org.apache.usergrid.persistence.cassandra.util.TraceTagManager;
 import org.apache.usergrid.persistence.cassandra.util.TraceTagReporter;
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 import org.apache.usergrid.setup.ConcurrentProcessSingleton;
-import rx.functions.Func0;
-import rx.functions.Func1;
 import rx.functions.Func2;
 
 import javax.annotation.concurrent.NotThreadSafe;
@@ -140,7 +135,7 @@ public class EntityManagerFactoryImplIT extends AbstractCoreIT {
             Thread.sleep( 500 );
         }
 
-        this.app.refreshIndex();
+        this.app.waitForQueueDrainAndRefreshIndex();
 
 
         // wait for it to appear in delete apps list
@@ -164,7 +159,7 @@ public class EntityManagerFactoryImplIT extends AbstractCoreIT {
         // delete the application
         setup.getEmf().deleteApplication(deletedAppId);
 
-        this.app.refreshIndex();
+        this.app.waitForQueueDrainAndRefreshIndex();
 
         found = findApps.call( deletedAppId, emf.getDeletedApplications() );
 
@@ -196,14 +191,14 @@ public class EntityManagerFactoryImplIT extends AbstractCoreIT {
             }
         }while (status.getStatus()!= ReIndexService.Status.COMPLETE);
 
-        this.app.refreshIndex();
+        this.app.waitForQueueDrainAndRefreshIndex();
 
         // test to see that app now works and is happy
 
         // it should not be found in the deleted apps collection
         found = findApps.call( deletedAppId, emf.getDeletedApplications());
         assertFalse("Restored app found in deleted apps collection", found);
-        this.app.refreshIndex();
+        this.app.waitForQueueDrainAndRefreshIndex();
 
         apps = setup.getEmf().getApplications();
         found = findApps.call(deletedAppId, apps);

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/persistence/query/ConnectionHelper.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/query/ConnectionHelper.java b/stack/core/src/test/java/org/apache/usergrid/persistence/query/ConnectionHelper.java
index e5c84f8..1f53c0a 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/query/ConnectionHelper.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/query/ConnectionHelper.java
@@ -76,7 +76,7 @@ public class ConnectionHelper extends CollectionIoHelper {
     @Override
     public Results getResults( Query query ) throws Exception {
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
         query.setConnectionType( CONNECTION );
         query.setEntityType( "test" );
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/persistence/query/IntersectionTransitivePagingIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/query/IntersectionTransitivePagingIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/query/IntersectionTransitivePagingIT.java
index cea3b35..dcc8eae 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/query/IntersectionTransitivePagingIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/query/IntersectionTransitivePagingIT.java
@@ -131,7 +131,7 @@ public class IntersectionTransitivePagingIT{
 
 
         }
-        this.app.refreshIndex();
+        this.app.waitForQueueDrainAndRefreshIndex();
 
         Thread.sleep(1000);
         return expected;

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/persistence/query/IntersectionUnionPagingIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/query/IntersectionUnionPagingIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/query/IntersectionUnionPagingIT.java
index 4d60164..3403dc8 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/query/IntersectionUnionPagingIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/query/IntersectionUnionPagingIT.java
@@ -135,7 +135,7 @@ public class IntersectionUnionPagingIT {
 
         }
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
         long stop = System.currentTimeMillis();
 
         logger.info( "Writes took {} ms", stop - start );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/persistence/query/IteratingQueryIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/query/IteratingQueryIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/query/IteratingQueryIT.java
index b2003fe..dac3f68 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/query/IteratingQueryIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/query/IteratingQueryIT.java
@@ -298,7 +298,7 @@ public class IteratingQueryIT {
             //we have to sleep, or we kill embedded cassandra
 
         }
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
         Thread.sleep(1000);
         long stop = System.currentTimeMillis();
 
@@ -367,7 +367,7 @@ public class IteratingQueryIT {
                 expected.add( name );
             }
         }
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         long stop = System.currentTimeMillis();
 
@@ -438,7 +438,7 @@ public class IteratingQueryIT {
             }
         }
 
-        this.app.refreshIndex();
+        this.app.waitForQueueDrainAndRefreshIndex();
         long stop = System.currentTimeMillis();
 
         logger.info( "Writes took {} ms", stop - start );
@@ -551,7 +551,7 @@ public class IteratingQueryIT {
                 expectedResults.add( name );
             }
         }
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         long stop = System.currentTimeMillis();
 
@@ -623,7 +623,7 @@ public class IteratingQueryIT {
             }
         }
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
         long stop = System.currentTimeMillis();
 
         logger.info( "Writes took {} ms", stop - start );
@@ -689,7 +689,7 @@ public class IteratingQueryIT {
             }
         }
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
         long stop = System.currentTimeMillis();
 
         logger.info( "Writes took {} ms", stop - start );
@@ -742,7 +742,7 @@ public class IteratingQueryIT {
             io.writeEntity( entity );
             expected.add( name );
         }
-        this.app.refreshIndex();
+        this.app.waitForQueueDrainAndRefreshIndex();
 
         long stop = System.currentTimeMillis();
 
@@ -803,7 +803,7 @@ public class IteratingQueryIT {
             io.writeEntity( entity );
             expected.add( name );
         }
-        this.app.refreshIndex();
+        this.app.waitForQueueDrainAndRefreshIndex();
 
         long stop = System.currentTimeMillis();
 
@@ -865,7 +865,7 @@ public class IteratingQueryIT {
             expected.add( name );
         }
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
         long stop = System.currentTimeMillis();
 
         logger.info( "Writes took {} ms", stop - start );
@@ -924,7 +924,7 @@ public class IteratingQueryIT {
             io.writeEntity( entity );
             expected.add( name );
         }
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         Thread.sleep(500);
         long stop = System.currentTimeMillis();
@@ -987,7 +987,7 @@ public class IteratingQueryIT {
             expected.add( name );
         }
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
         long stop = System.currentTimeMillis();
 
         logger.info( "Writes took {} ms", stop - start );
@@ -1050,7 +1050,7 @@ public class IteratingQueryIT {
             io.writeEntity( entity );
             expected.add( name );
         }
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         long stop = System.currentTimeMillis();
 
@@ -1110,7 +1110,7 @@ public class IteratingQueryIT {
             io.writeEntity( entity );
         }
 
-        this.app.refreshIndex();
+        this.app.waitForQueueDrainAndRefreshIndex();
 
         long stop = System.currentTimeMillis();
 
@@ -1216,7 +1216,7 @@ public class IteratingQueryIT {
 
         logger.info( "Writes took {} ms", stop - start );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         Query query = Query.fromQL( "select * order by boolean desc, index asc" );
         query.setLimit( queryLimit );
@@ -1322,7 +1322,7 @@ public class IteratingQueryIT {
 
         logger.info( "Writes took {} ms", stop - start );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         Query query =
             Query.fromQL( "select * where intersect = true OR intersect2 = true order by created, intersect desc" );
@@ -1384,7 +1384,7 @@ public class IteratingQueryIT {
 
             io.writeEntity( entity );
         }
-        this.app.refreshIndex();
+        this.app.waitForQueueDrainAndRefreshIndex();
 
         long stop = System.currentTimeMillis();
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/persistence/query/NotSubPropertyIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/query/NotSubPropertyIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/query/NotSubPropertyIT.java
index f7308da..3f5573f 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/query/NotSubPropertyIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/query/NotSubPropertyIT.java
@@ -132,7 +132,7 @@ public class NotSubPropertyIT {
 
         logger.info( "Writes took {} ms", stop - start );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         return expected;
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/persistence/query/ParenthesisProblemIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/query/ParenthesisProblemIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/query/ParenthesisProblemIT.java
index 60c1622..89641a8 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/query/ParenthesisProblemIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/query/ParenthesisProblemIT.java
@@ -72,7 +72,7 @@ public class ParenthesisProblemIT extends AbstractCoreIT {
             put("age",1);
         }});
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         final Results entities = em.searchCollection( em.getApplicationRef(), "cats", Query.fromQL(query));
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/resources/usergrid-custom-test.properties
----------------------------------------------------------------------
diff --git a/stack/core/src/test/resources/usergrid-custom-test.properties b/stack/core/src/test/resources/usergrid-custom-test.properties
index c544967..8f9058d 100644
--- a/stack/core/src/test/resources/usergrid-custom-test.properties
+++ b/stack/core/src/test/resources/usergrid-custom-test.properties
@@ -49,6 +49,9 @@ collection.uniquevalues.authoritative.region=us-east
 # Queueing Test Settings
 # Reduce the long polling time for the tests
 queue.long.polling.time.millis=50
+elasticsearch.worker_count=8
+elasticsearch.worker_count_utility=8
+queue.get.timeout.seconds=8
 
 # --- End: Usergrid cluster/actor system settings
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
index 061807b..778274e 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
@@ -165,7 +165,7 @@ public interface QakkaFig extends GuicyFig, Serializable {
     long getMaxShardSize();
 
     @Key(QUEUE_LONG_POLL_TIME_MILLIS)
-    @Default("5000")
+    @Default("1000")
     long getLongPollTimeMillis();
 
     /** Max time-to-live for queue message and payload data */
@@ -174,7 +174,7 @@ public interface QakkaFig extends GuicyFig, Serializable {
     int getMaxTtlSeconds();
 
     @Key(QUEUE_IN_MEMORY)
-    @Default("true")
+    @Default("false") // in memory not ready yet; leave this to false else msgs could be processed more than once
     boolean getInMemoryCache();
 
     @Key(QUEUE_IN_MEMORY_REFRESH_ASYNC)

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java
index 09bb8de..fa5ee0b 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java
@@ -59,7 +59,7 @@ public class InMemoryQueue {
         }
     }
 
-    public void add( String queueName, DatabaseQueueMessage databaseQueueMessage ) {
+    synchronized public void add( String queueName, DatabaseQueueMessage databaseQueueMessage ) {
 
         UUID newest = newestByQueueName.get( queueName );
         if ( newest == null ) {
@@ -76,7 +76,7 @@ public class InMemoryQueue {
         getQueue( queueName ).add( databaseQueueMessage );
     }
 
-    public UUID getNewest( String queueName ) {
+    synchronized public UUID getNewest( String queueName ) {
         if ( getQueue( queueName ).isEmpty() ) {
             newestByQueueName.remove( queueName );
         }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java
index ac2857f..fd4257b 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java
@@ -44,6 +44,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 
 
 @Singleton
@@ -188,7 +189,7 @@ public class QueueMessageManagerImpl implements QueueMessageManager {
                         queueMessage.setData( json );
 
                     } catch (UnsupportedEncodingException e) {
-                        logger.error("Error unencoding data for messageId=" + queueMessage.getMessageId(), e);
+                        logger.error("Error decoding data for messageId=" + queueMessage.getMessageId(), e);
                     }
                 } else {
                     try {
@@ -201,6 +202,12 @@ public class QueueMessageManagerImpl implements QueueMessageManager {
                 }
 
                 queueMessages.add( queueMessage );
+            } else if ( (System.currentTimeMillis() - dbMessage.getQueuedAt()) > TimeUnit.HOURS.toMillis(2) ) {
+                logger.warn("Queue Message does not have corresponding data after 2 hours, removing from queue - " +
+                    "queueName: {}, region: {}, queueMessageId: {}", dbMessage.getQueueName(), dbMessage.getRegion(),
+                    dbMessage.getQueueMessageId());
+                queueMessageSerialization.deleteMessage(dbMessage.getQueueName(), dbMessage.getRegion(),
+                    dbMessage.getShardId(), dbMessage.getType(), dbMessage.getQueueMessageId());
             }
         }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java
index 89c79ec..eb26b69 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java
@@ -23,6 +23,7 @@ import com.codahale.metrics.Timer;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.commons.lang3.SystemUtils;
 import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 import org.apache.usergrid.persistence.qakka.MetricsService;
@@ -177,7 +178,7 @@ public class QueueActorHelper {
             }
         }
 
-        newestFetchedUuid.put( queueName, since );
+        updateUUIDPointer(queueName, since);
 
 //        Shard currentShard = multiShardIterator.getCurrentShard();
 //        if ( currentShard != null ) {
@@ -279,7 +280,7 @@ public class QueueActorHelper {
     }
 
 
-    void queueRefresh( String queueName ) {
+    synchronized void queueRefresh( String queueName ) {
 
         Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.REFRESH_TIME).time();
 
@@ -327,7 +328,7 @@ public class QueueActorHelper {
 
                 startingShards.put( shardKey, shardId );
 
-                lastRefreshTimeMillis.put( queueName, System.currentTimeMillis() );
+                updateLastRefreshedTime(queueName);
 
                 if ( count > 0 ) {
                     Object shard = shardIdOptional.isPresent() ? shardIdOptional.get() : "null";
@@ -346,4 +347,12 @@ public class QueueActorHelper {
         return queueName + "_" + type + region;
     }
 
+    private synchronized void updateUUIDPointer(String queueName, UUID newUUIDPointer){
+        newestFetchedUuid.put( queueName, newUUIDPointer );
+    }
+
+    private synchronized void updateLastRefreshedTime(String queueName){
+        lastRefreshTimeMillis.put( queueName, System.currentTimeMillis() );
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java
index 1ff8502..cbc7245 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java
@@ -133,7 +133,7 @@ public class QueueActorRouter extends UntypedActor {
                     getContext().dispatcher(),
                     getSelf() );
                 shardAllocationSchedulersByQueueName.put( queueName, scheduler );
-                logger.debug( "Created shard allocater for queue {}", queueName );
+                logger.debug( "Created shard allocator for queue {}", queueName );
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java
index 19059e6..75c1c22 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java
@@ -139,8 +139,8 @@ public class ShardAllocator extends UntypedActor {
                 shardSerialization.createShard( newShard );
                 shardCounterSerialization.incrementCounter( queueName, type, newShard.getShardId(), 0 );
 
-                logger.info("{} Created new shard for queue {} shardId {} timestamp {} counterValue {}",
-                        this.hashCode(), queueName, shard.getShardId(), futureUUID.timestamp(), counterValue );
+                logger.info("Allocated new shard for queue, newShardID: {}, queueName: {}, shardMessageCount: {}, usedPercent: {}%",
+                    newShard.getShardId(), queueName, counterValue, (long)((double)counterValue/(double)qakkaFig.getMaxShardSize()*100) );
 
             } else {
 //                logger.debug("No new shard for queue {} counterValue {} of max {}",