You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by mr...@apache.org on 2017/04/02 23:23:19 UTC

[2/3] usergrid git commit: Switch DISTRIBUTED database queueing to default not cache in memory as the in memory impl causes duplicate messgae processing quite often at the moment.

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
index 98e055a..b1f72aa 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
@@ -20,6 +20,8 @@
 package org.apache.usergrid.persistence.qakka.distributed.impl;
 
 import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.dispatch.OnFailure;
 import akka.pattern.Patterns;
 import akka.util.Timeout;
 import com.codahale.metrics.*;
@@ -42,7 +44,9 @@ import org.apache.usergrid.persistence.qakka.serialization.queuemessages.Databas
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.Await;
+import scala.concurrent.ExecutionContext;
 import scala.concurrent.Future;
+import scala.concurrent.Promise;
 
 import java.lang.reflect.Method;
 import java.util.*;
@@ -235,20 +239,21 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
     public Collection<DatabaseQueueMessage> getNextMessagesInternal( String queueName, int count ) {
 
         if ( actorSystemManager.getClientActor() == null || !actorSystemManager.isReady() ) {
-            logger.error("Akka Actor System is not ready yet for requests.");
-            return Collections.EMPTY_LIST;
+            logger.warn("Akka Actor System is not ready yet for requests.");
+            return Collections.emptyList();
         }
 
         int maxRetries = qakkaFig.getMaxGetRetries();
         int tries = 0;
 
+        boolean interrupted = false;
+
         QueueGetRequest request = new QueueGetRequest( queueName, count );
         while ( ++tries < maxRetries ) {
             try {
                 Timeout t = new Timeout( qakkaFig.getGetTimeoutSeconds(), TimeUnit.SECONDS );
 
                 // ask ClientActor and wait (up to timeout) for response
-
                 Future<Object> fut = Patterns.ask( actorSystemManager.getClientActor(), request, t );
                 Object responseObject = Await.result( fut, t.duration() );
 
@@ -259,8 +264,8 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
                     if ( response != null && response instanceof QueueGetResponse) {
                         QueueGetResponse qprm = (QueueGetResponse)response;
                         if ( qprm.isSuccess() ) {
-                            if (tries > 1) {
-                                logger.warn( "getNextMessage {} SUCCESS after {} tries", queueName, tries );
+                            if (tries > 1 && !interrupted) {
+                                logger.warn( "getNextMessage for queue {} SUCCESS after {} tries", queueName, tries );
                             }
                         }
                         return qprm.getQueueMessages();
@@ -284,10 +289,13 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
                 }
 
             } catch ( TimeoutException e ) {
-                logger.trace("TIMEOUT popping to queue " + queueName + " retrying " + tries, e );
-
-            } catch ( Exception e ) {
-                logger.debug("ERROR popping to queue " + queueName + " retrying " + tries, e );
+                logger.warn("TIMEOUT popping queue " + queueName + ", attempt:  " + tries, e );
+            } catch(InterruptedException e){
+                interrupted = true;
+                // this might happen, retry the ask again
+                logger.trace("Thread was marked interrupted so unable to wait for the result, attempt: {}", tries);
+            }catch ( Exception e ) {
+                logger.error("ERROR popping queue " + queueName + ", attempt: " + tries, e );
             }
         }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java
index 11f3d08..4745cb1 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java
@@ -181,6 +181,9 @@ public class ShardAllocatorTest extends AbstractAkkaTest {
 
         distributedQueueService.refresh();
 
+        // the shard allocator kicks in when messages are first received
+        distributedQueueService.getNextMessages(queueName,10);
+
         try {
 
             // Create number of messages

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/corepersistence/queue/src/test/resources/qakka.properties
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/resources/qakka.properties b/stack/corepersistence/queue/src/test/resources/qakka.properties
index 94bfeff..d77e7e8 100644
--- a/stack/corepersistence/queue/src/test/resources/qakka.properties
+++ b/stack/corepersistence/queue/src/test/resources/qakka.properties
@@ -36,7 +36,8 @@ usergrid.cluster.seeds=us-east:localhost
 # Port used for cluster communications.
 usergrid.cluster.port=3545
 
-queue.inmemory.cache=true
+# In-Memory Queueing Not Ready Yet; Leave this to false else, messages are potentially processed more than once
+queue.inmemory.cache=false
 
 queue.num.actors=50
 queue.sender.num.actors=100
@@ -47,7 +48,7 @@ queue.get.timeout.seconds=5
 
 # set shard size and times low for testing purposes
 queue.shard.max.size=10
-queue.shard.allocation.check.frequency.millis=1000
+queue.shard.allocation.check.frequency.millis=500
 queue.shard.allocation.advance.time.millis=200
 
 # set low for testing purposes

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/CollectionMetadataIT.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/CollectionMetadataIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/CollectionMetadataIT.java
index 4e58935..a96d725 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/CollectionMetadataIT.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/CollectionMetadataIT.java
@@ -63,7 +63,7 @@ public class CollectionMetadataIT extends AbstractRestIT {
         e3 = this.app().collection(collectionName).post(e3);
         assertNotNull(e3);
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         // create connections
         // e1 hates e3
@@ -73,7 +73,7 @@ public class CollectionMetadataIT extends AbstractRestIT {
         // e3 has one in (hates) connection
         this.app().collection(collectionName).entity(e1).connection("hates").entity(e3).post();
         this.app().collection(collectionName).entity(e2).connection("likes").entity(e1).post();
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         // no query param, "all", and invalid param all the same
         checkMetadata(e1, null, "hates", "likes");

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/NotificationsIT.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/NotificationsIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/NotificationsIT.java
index fa68350..922c678 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/NotificationsIT.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/NotificationsIT.java
@@ -18,17 +18,15 @@ package org.apache.usergrid.rest;
 import com.codahale.metrics.Meter;
 import com.codahale.metrics.MetricRegistry;
 import com.codahale.metrics.Slf4jReporter;
-import com.fasterxml.jackson.databind.JsonNode;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
-import javax.ws.rs.core.MediaType;
+
 import org.apache.commons.lang3.time.StopWatch;
-import org.apache.usergrid.rest.test.resource.*;
-import org.apache.usergrid.rest.test.resource.endpoints.NamedResource;
 import org.apache.usergrid.rest.test.resource.model.*;
 import org.apache.usergrid.rest.test.resource.model.ApiResponse;
 import org.junit.After;
@@ -85,7 +83,7 @@ public class NotificationsIT extends org.apache.usergrid.rest.test.resource.Abst
 
         String unIndexedCollectionName = "notifications";
         app().collection( unIndexedCollectionName ).collection( "_settings" ).post( payload );
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         // create notifier
         Entity notifier = new Entity().chainPut("name", "mynotifier").chainPut("provider", "noop");
@@ -103,7 +101,7 @@ public class NotificationsIT extends org.apache.usergrid.rest.test.resource.Abst
         Token token = this.app().token().post(new Token("ed", "sesame"));
         this.clientSetup.getRestClient().token().setToken(token);
 
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         // create devices
         int devicesCount = 0;
@@ -129,7 +127,7 @@ public class NotificationsIT extends org.apache.usergrid.rest.test.resource.Abst
             devicesCount++;
         }
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         String postMeterName = getClass().getSimpleName() + ".postNotifications";
         Meter postMeter = registry.meter( postMeterName );
@@ -168,7 +166,7 @@ public class NotificationsIT extends org.apache.usergrid.rest.test.resource.Abst
         }
         registry.remove( postMeterName );
 
-        refreshIndex( );
+        waitForQueueDrainAndRefreshIndex( );
 
         logger.info("Waiting for all notifications to be sent");
         StopWatch sw = new StopWatch();

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/PartialUpdateTest.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/PartialUpdateTest.java b/stack/rest/src/test/java/org/apache/usergrid/rest/PartialUpdateTest.java
index 1067365..9b295f0 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/PartialUpdateTest.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/PartialUpdateTest.java
@@ -61,7 +61,7 @@ public class PartialUpdateTest extends AbstractRestIT {
         String uuid = userNode.get("uuid").toString();
         assertNotNull(uuid);
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         Map<String, Object> updateProperties = new LinkedHashMap<String, Object>();
         // update user bart passing only an update to a property
@@ -81,7 +81,7 @@ public class PartialUpdateTest extends AbstractRestIT {
                 fail("Update failed due to: " + uie.getResponse().readEntity(String.class));
             }
 
-            refreshIndex();
+            waitForQueueDrainAndRefreshIndex();
             // retrieve the user from the backend
             userNode = this.app().collection("users").entity(userNode).get();
 
@@ -123,7 +123,7 @@ public class PartialUpdateTest extends AbstractRestIT {
         } catch (ClientErrorException uie) {
             fail("Update failed due to: " + uie.getResponse().readEntity(String.class));
         }
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         userNode = this.app().collection("users").entity(userNode).get();
         assertNotNull(userNode);

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/SystemResourceIT.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/SystemResourceIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/SystemResourceIT.java
index dd2a733..383c046 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/SystemResourceIT.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/SystemResourceIT.java
@@ -24,7 +24,6 @@ import org.junit.Test;
 
 import java.util.LinkedHashMap;
 import java.util.Map;
-import java.util.UUID;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -57,7 +56,7 @@ public class SystemResourceIT extends AbstractRestIT {
         for(int i =0; i<count;i++) {
             this.app().collection("tests").post(new Entity().chainPut("testval", "test"));
         }
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
         QueryParameters queryParameters = new QueryParameters();
         queryParameters.addParam( "access_token", clientSetup.getSuperuserToken().getAccessToken() );
         queryParameters.addParam("confirmApplicationName", this.clientSetup.getAppName());

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/applications/ApplicationCreateIT.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/ApplicationCreateIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/ApplicationCreateIT.java
index 0f9be30..a6d987b 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/ApplicationCreateIT.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/ApplicationCreateIT.java
@@ -104,7 +104,7 @@ public class ApplicationCreateIT extends AbstractRestIT {
             .management().orgs().org( orgName ).app().post( new Application( appName ) );
         UUID appId = appCreateResponse.getEntities().get(0).getUuid();
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
         for ( int i=0; i<5; i++ ) {
 
             final String entityName = "entity" + i;

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/applications/ApplicationDeleteIT.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/ApplicationDeleteIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/ApplicationDeleteIT.java
index 6416cff..6521444 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/ApplicationDeleteIT.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/ApplicationDeleteIT.java
@@ -213,7 +213,7 @@ public class ApplicationDeleteIT extends AbstractRestIT {
         // test that we cannot see the application in the list of applications returned
         // by the management resource's get organization's applications end-point
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         ManagementResponse orgAppResponse = clientSetup.getRestClient()
             .management().orgs().org( orgName ).apps().getOrganizationApplications();
@@ -295,7 +295,7 @@ public class ApplicationDeleteIT extends AbstractRestIT {
             .request()
             .delete();
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
 
         // restore the app
@@ -308,7 +308,7 @@ public class ApplicationDeleteIT extends AbstractRestIT {
             .request()
             .put( javax.ws.rs.client.Entity.entity( "", MediaType.APPLICATION_JSON )); // must send body
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         // test that we can see the application in the list of applications
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/applications/ApplicationResourceIT.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/ApplicationResourceIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/ApplicationResourceIT.java
index 9f4f8aa..8dabf93 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/ApplicationResourceIT.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/ApplicationResourceIT.java
@@ -181,7 +181,7 @@ public class ApplicationResourceIT extends AbstractRestIT {
             }
             logger.info( "Waiting for app to become available" );
             Thread.sleep(500);
-            refreshIndex();
+            waitForQueueDrainAndRefreshIndex();
         }
         assertNotNull( clientId );
         assertNotNull( clientSecret );
@@ -242,7 +242,7 @@ public class ApplicationResourceIT extends AbstractRestIT {
 
         assertNotNull(entity);
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         //retrieve the app using a username and password
         QueryParameters params = new QueryParameters()
@@ -354,7 +354,7 @@ public class ApplicationResourceIT extends AbstractRestIT {
         Entity entity = this.app().collection("users").post(user);
         //assert that it was saved correctly
         assertNotNull(entity);
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         //add a ttl to the entity that is greater than the maximum
         entity.chainPut("grant_type", "password").chainPut("ttl", Long.MAX_VALUE);
@@ -392,7 +392,7 @@ public class ApplicationResourceIT extends AbstractRestIT {
         //save the entity
         Entity entity = this.app().collection("users").post(user);
         assertNotNull(entity);
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         //Retrieve an authentication token for the user, setting the TTL
         Token apiResponse = target().path( String.format( "/%s/%s/token", orgName, appName ) )
@@ -457,7 +457,7 @@ public class ApplicationResourceIT extends AbstractRestIT {
         //save the entity
         Entity entity = this.app().collection("users").post(user);
         assertNotNull(entity);
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         try {
             //Retrieve a token for the new user, setting the TTL to an invalid value
@@ -496,7 +496,7 @@ public class ApplicationResourceIT extends AbstractRestIT {
         //save the entity
         Entity entity = this.app().collection("users").post(user);
         assertNotNull(entity);
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
         //Retrieve an authentication token for the user
         Token tokenResponse = this.app().getTarget( false ).path( "token" )
             .queryParam( "grant_type", "password" )

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/applications/assets/AssetResourceIT.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/assets/AssetResourceIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/assets/AssetResourceIT.java
index 144893d..616d929 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/assets/AssetResourceIT.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/assets/AssetResourceIT.java
@@ -72,7 +72,7 @@ public class AssetResourceIT extends AbstractRestIT {
     @Test
     public void octetStreamOnDynamicEntity() throws Exception {
 
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         //  post an asset entity
 
@@ -113,7 +113,7 @@ public class AssetResourceIT extends AbstractRestIT {
     @Test
     public void verifyMetadataChanged() throws Exception {
 
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         // post an entity
 
@@ -130,7 +130,7 @@ public class AssetResourceIT extends AbstractRestIT {
             .field( "name", "verifyMetadataChangedTest" )
             .field( "file", data, MediaType.MULTIPART_FORM_DATA_TYPE );
         ApiResponse putResponse = pathResource( getOrgAppPath( "foos/" + assetId ) ).put( form );
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         // get entity and check asset metadata
 
@@ -175,7 +175,7 @@ public class AssetResourceIT extends AbstractRestIT {
             .field( "name", "verifyMetadataChangedTest" )
             .field( "file", data, MediaType.MULTIPART_FORM_DATA_TYPE );
         putResponse = pathResource( getOrgAppPath( "foos/" + assetId ) ).put( form );
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         //verify that data was correctly written to backend
         getResponse = pathResource( getOrgAppPath( "foos/" + assetId ) ).get( ApiResponse.class );
@@ -193,14 +193,14 @@ public class AssetResourceIT extends AbstractRestIT {
     @Test
     public void multipartPostFormOnDynamicEntity() throws Exception {
 
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         // post data larger than 5M
 
         byte[] data = IOUtils.toByteArray( this.getClass().getResourceAsStream( "/file-bigger-than-5M" ) );
         FormDataMultiPart form = new FormDataMultiPart().field( "file", data, MediaType.MULTIPART_FORM_DATA_TYPE );
         ApiResponse putResponse = pathResource(getOrgAppPath("foos")).post(form);
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         UUID assetId = putResponse.getEntities().get(0).getUuid();
         assertNotNull(assetId);
@@ -234,7 +234,7 @@ public class AssetResourceIT extends AbstractRestIT {
     @Test
     public void multipartPutFormOnDynamicEntity() throws Exception {
 
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         // post an entity
 
@@ -250,7 +250,7 @@ public class AssetResourceIT extends AbstractRestIT {
             .field( "foo", "bar2" )
             .field( "file", data, MediaType.MULTIPART_FORM_DATA_TYPE );
         ApiResponse putResponse = pathResource( getOrgAppPath( "foos/" + assetId ) ).put( form );
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         // get entity and check asset metadata
 
@@ -283,7 +283,7 @@ public class AssetResourceIT extends AbstractRestIT {
     @Test
     public void largeFileInS3() throws Exception {
 
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         // upload file larger than 5MB
 
@@ -310,7 +310,7 @@ public class AssetResourceIT extends AbstractRestIT {
     @Test
     public void fileTooLargeShouldResultInError() throws Exception {
 
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         // set max file size down to 6mb
 
@@ -354,7 +354,7 @@ public class AssetResourceIT extends AbstractRestIT {
     @Test
     public void deleteConnectionToAsset() throws IOException {
 
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         // create the entity that will be the asset, an image
 
@@ -378,7 +378,7 @@ public class AssetResourceIT extends AbstractRestIT {
 
         ApiResponse connectResponse = pathResource(
             getOrgAppPath( "imagegalleries/" + imageGalleryId + "/contains/" + uuid ) ).post( ApiResponse.class );
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         // verify connection from imagegallery to asset
 
@@ -389,7 +389,7 @@ public class AssetResourceIT extends AbstractRestIT {
         // delete the connection
 
         pathResource( getOrgAppPath( "imagegalleries/" + imageGalleryId + "/contains/" + uuid ) ).delete();
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         // verify that connection is gone
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/applications/assets/AwsAssetResourceIT.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/assets/AwsAssetResourceIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/assets/AwsAssetResourceIT.java
index ad12975..4a9bfaa 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/assets/AwsAssetResourceIT.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/assets/AwsAssetResourceIT.java
@@ -205,7 +205,7 @@ public class AwsAssetResourceIT extends AbstractRestIT {
     @Test
     public void octetStreamOnDynamicEntity() throws Exception {
 
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         //  post an asset entity
 
@@ -246,14 +246,14 @@ public class AwsAssetResourceIT extends AbstractRestIT {
     @Test
     public void multipartPostFormOnDynamicEntity() throws Exception {
 
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         // post data larger than 5M
 
         byte[] data = IOUtils.toByteArray( this.getClass().getResourceAsStream( "/file-bigger-than-5M" ) );
         FormDataMultiPart form = new FormDataMultiPart().field( "file", data, MediaType.MULTIPART_FORM_DATA_TYPE );
         ApiResponse putResponse = pathResource(getOrgAppPath("foos")).post(form);
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         UUID assetId = putResponse.getEntities().get(0).getUuid();
         assertNotNull(assetId);
@@ -287,7 +287,7 @@ public class AwsAssetResourceIT extends AbstractRestIT {
     @Test
     public void multipartPutFormOnDynamicEntity() throws Exception {
 
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         // post an entity
 
@@ -303,7 +303,7 @@ public class AwsAssetResourceIT extends AbstractRestIT {
             .field( "foo", "bar2" )
             .field( "file", data, MediaType.MULTIPART_FORM_DATA_TYPE );
         ApiResponse putResponse = pathResource( getOrgAppPath( "foos/" + assetId ) ).put( form );
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         // get entity and check asset metadata
 
@@ -336,7 +336,7 @@ public class AwsAssetResourceIT extends AbstractRestIT {
     @Test
     public void largeFileInS3() throws Exception {
 
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         // upload file larger than 5MB
 
@@ -363,7 +363,7 @@ public class AwsAssetResourceIT extends AbstractRestIT {
     @Test
     public void fileTooLargeShouldResultInError() throws Exception {
 
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         // set max file size down to 6mb
         setTestProperty( "usergrid.binary.max-size-mb","6" );
@@ -383,7 +383,7 @@ public class AwsAssetResourceIT extends AbstractRestIT {
 
             // attempt to get asset entity, it should contain error
 
-            refreshIndex();
+            waitForQueueDrainAndRefreshIndex();
             ApiResponse getResponse = pathResource( getOrgAppPath( "bars/" +assetId ) ).get( ApiResponse.class );
             Map<String, Object> fileMetadata = (Map<String, Object>)getResponse.getEntities().get(0).get("file-metadata");
             assertNotNull( fileMetadata );
@@ -403,7 +403,7 @@ public class AwsAssetResourceIT extends AbstractRestIT {
     @Test
     public void deleteConnectionToAsset() throws IOException {
 
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         // create the entity that will be the asset, an image
 
@@ -427,7 +427,7 @@ public class AwsAssetResourceIT extends AbstractRestIT {
 
         ApiResponse connectResponse = pathResource(
             getOrgAppPath( "imagegalleries/" + imageGalleryId + "/contains/" + uuid ) ).post( ApiResponse.class );
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         // verify connection from imagegallery to asset
 
@@ -438,7 +438,7 @@ public class AwsAssetResourceIT extends AbstractRestIT {
         // delete the connection
 
         pathResource( getOrgAppPath( "imagegalleries/" + imageGalleryId + "/contains/" + uuid ) ).delete();
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         // verify that connection is gone
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/BrowserCompatibilityTest.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/BrowserCompatibilityTest.java b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/BrowserCompatibilityTest.java
index b453ed2..b63400a 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/BrowserCompatibilityTest.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/BrowserCompatibilityTest.java
@@ -69,7 +69,7 @@ public class BrowserCompatibilityTest extends org.apache.usergrid.rest.test.reso
         Entity entity = this.app().collection("things").post(payload);
         assertEquals(entity.get("name"), name);
         String uuid = entity.getAsString("uuid");
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         //now get this new entity with "text/html" in the accept header
         Entity returnedEntity = this.app().collection("things").withAcceptHeader(acceptHeader).entity(entity).get();

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/CollectionsResourceIT.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/CollectionsResourceIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/CollectionsResourceIT.java
index d72054a..bf06c21 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/CollectionsResourceIT.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/CollectionsResourceIT.java
@@ -136,7 +136,7 @@ public class CollectionsResourceIT extends AbstractRestIT {
             fail("This should return a success.");
         }
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
 
         Collection collection = this.app().collection( "testCollections" ).collection( "_settings" ).get();
@@ -159,7 +159,7 @@ public class CollectionsResourceIT extends AbstractRestIT {
 
         //Post index to the collection metadata
         Entity thing = this.app().collection( "testCollections" ).collection( "_settings" ).post( payload );
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
 
         //The above verifies the test case.
@@ -172,7 +172,7 @@ public class CollectionsResourceIT extends AbstractRestIT {
 
         //Post entity.
         Entity postedEntity = this.app().collection( "testCollections" ).post( testEntity );
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         //Do a query to see if you can find the indexed query.
         String query = "two ='query'";
@@ -198,11 +198,11 @@ public class CollectionsResourceIT extends AbstractRestIT {
 
         //next part is to delete the schema then reindex it and it should work.
         this.app().collection( "testCollections" ).collection( "_settings" ).delete();
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         this.app().collection( "testCollections" ).collection( "_reindex" )
             .post(true,clientSetup.getSuperuserToken(),ApiResponse.class,null,null,false);
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
 
         //Do a query to see if you can find the indexed query.
@@ -233,14 +233,14 @@ public class CollectionsResourceIT extends AbstractRestIT {
         Entity payload = new Entity();
         payload.put( "fields", "all");
         app().collection( "testCollection" ).collection( "_settings" ).post( payload );
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         // post entity with two fields
         Entity testEntity = new Entity();
         testEntity.put( "one", "helper" );
         testEntity.put( "two","query" );
         app().collection( "testCollection" ).post( testEntity );
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         // verify it can be queried on both fields
 
@@ -288,7 +288,7 @@ public class CollectionsResourceIT extends AbstractRestIT {
 
         //Post index to the collection metadata
         Entity thing = this.app().collection( "testCollection" ).collection( "_settings" ).post( payload );
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
 
         //Reindex and verify that the entity only has field one index.
@@ -339,7 +339,7 @@ public class CollectionsResourceIT extends AbstractRestIT {
 
         //Post index to the collection metadata
         Entity thing = this.app().collection( "testCollection" ).collection( "_settings" ).post( payload );
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         Collection collection = this.app().collection( "testCollection" ).collection( "_settings" ).get();
 
@@ -419,7 +419,7 @@ public class CollectionsResourceIT extends AbstractRestIT {
 
         //Post index to the collection metadata
         this.app().collection( "testCollection" ).collection( "_settings" ).post( payload );
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         //Create test collection with a test entity that is partially indexed.
         Entity testEntity = new Entity();
@@ -428,7 +428,7 @@ public class CollectionsResourceIT extends AbstractRestIT {
 
         //Post entity.
         this.app().collection( "testCollection" ).post( testEntity );
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         //Do a query to see if you can find the indexed query.
         String query = "two ='query'";
@@ -461,7 +461,7 @@ public class CollectionsResourceIT extends AbstractRestIT {
 
         //Post index to the collection metadata
         this.app().collection( "testCollection" ).collection( "_settings" ).post( payload );
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         Map<String,Object> arrayFieldsForTesting = new HashMap<>();
 
@@ -475,7 +475,7 @@ public class CollectionsResourceIT extends AbstractRestIT {
 
         //Post entity.
         this.app().collection( "testCollection" ).post( testEntity );
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         //Do a query to see if you can find the indexed query.
         String query = "one.key = 'value'";
@@ -511,7 +511,7 @@ public class CollectionsResourceIT extends AbstractRestIT {
 
         //Post index to the collection metadata
         this.app().collection( "testCollection" ).collection( "_settings" ).post( payload );
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         Map<String,Object> arrayFieldsForTesting = new HashMap<>();
 
@@ -525,7 +525,7 @@ public class CollectionsResourceIT extends AbstractRestIT {
 
         //Post entity.
         this.app().collection( "testCollection" ).post( testEntity );
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         //Do a query to see if you can find the indexed query.
         String query = "one.key = 'value'";
@@ -554,7 +554,7 @@ public class CollectionsResourceIT extends AbstractRestIT {
 
         //Post index to the collection metadata
         this.app().collection( "testCollection" ).collection( "_settings" ).post( payload );
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         Map<String,Object> arrayFieldsForTestingSelectiveIndexing = new HashMap<>();
 
@@ -573,7 +573,7 @@ public class CollectionsResourceIT extends AbstractRestIT {
 
         //Post entity.
         this.app().collection( "testCollection" ).post( testEntity );
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         //Do a query to see if you can find the indexed query.
         String query = "one.key.wowMoreKeys = 'value'";
@@ -609,7 +609,7 @@ public class CollectionsResourceIT extends AbstractRestIT {
 
         //Post index to the collection metadata
         this.app().collection( "testCollection" ).collection( "_settings" ).post( payload );
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         Map<String,Object> arrayFieldsForTestingSelectiveIndexing = new HashMap<>();
 
@@ -629,7 +629,7 @@ public class CollectionsResourceIT extends AbstractRestIT {
 
         //Post entity.
         this.app().collection( "testCollection" ).post( testEntity );
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         //Do a query to see if you can find the indexed query.
         String query = "name = 'howdy'";
@@ -660,7 +660,7 @@ public class CollectionsResourceIT extends AbstractRestIT {
 
         //Post index to the collection metadata
         this.app().collection( "testCollection" ).collection( "_settings" ).post( payload );
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         //Create test collection with a test entity that is partially indexed.
         Entity testEntity = new Entity();
@@ -669,11 +669,11 @@ public class CollectionsResourceIT extends AbstractRestIT {
 
         //Post entity.
         Entity postedEntity = this.app().collection( "testCollection" ).post( testEntity );
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         testEntity.put( "one","three" );
         this.app().collection( "testCollection" ).entity( postedEntity.getUuid() ).put( testEntity );
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         //Do a query to see if you can find the indexed query.
         String query = "one = 'three'";
@@ -715,7 +715,7 @@ public class CollectionsResourceIT extends AbstractRestIT {
         Entity user = this.app().collection("users").post(payload);
         assertEquals(user.get("username"), username);
         assertEquals(user.get("email"), email);
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         String collectionName = "nestprofiles";
         //create a permission with the path "me" in it
@@ -743,7 +743,7 @@ public class CollectionsResourceIT extends AbstractRestIT {
         Entity nestProfile = this.app().collection(collectionName).post(payload);
         assertEquals(nestProfile.get("name"), profileName);
 
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         Entity nestprofileReturned = this.app().collection(collectionName).entity(nestProfile).get();
         assertEquals(nestprofileReturned.get("name"), profileName);
@@ -766,7 +766,7 @@ public class CollectionsResourceIT extends AbstractRestIT {
         assertEquals( calendarlistOne.get( "summaryOverview" ), summaryOverview );
         assertEquals(calendarlistOne.get("caltype"), calType);
 
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         //post a second entity
         payload = new Entity();
@@ -819,9 +819,9 @@ public class CollectionsResourceIT extends AbstractRestIT {
         assertNotSame( null,
             ((LinkedHashMap)(collectionHashMap.get( "collections" ))).get( collectionName.toLowerCase() ));
 
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
         this.app().collection( collectionName ).entity( testEntity.getEntity().getUuid() ).delete();
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
 
         //Verify that the collection still exists despite deleting its only entity.)
@@ -850,7 +850,7 @@ public class CollectionsResourceIT extends AbstractRestIT {
         payload.put("name", name);
         Entity user = this.app().collection("app_users").post(payload);
         assertEquals(user.get("name"), name);
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         Entity user2 = this.app().collection("app_users").entity(user).get();
 
@@ -880,7 +880,7 @@ public class CollectionsResourceIT extends AbstractRestIT {
         String randomizer = RandomStringUtils.randomAlphanumeric(10);
         String collectionName = "col_" + randomizer;
         app().collection( collectionName ).collection( "_settings" ).post( payload );
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         // was the no-index wildcard saved and others ignored?
         Collection collection = app().collection( collectionName ).collection( "_settings" ).get();
@@ -923,7 +923,7 @@ public class CollectionsResourceIT extends AbstractRestIT {
         String randomizer = RandomStringUtils.randomAlphanumeric(10);
         String unIndexedCollectionName = "col_" + randomizer;
         app().collection( unIndexedCollectionName ).collection( "_settings" ).post( payload );
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         String entityName1 = "unindexed1";
         Entity unindexed1 = this.app().collection( unIndexedCollectionName )
@@ -982,7 +982,7 @@ public class CollectionsResourceIT extends AbstractRestIT {
 
         String unIndexedCollectionName = "col_" + randomizer;
         app().collection( unIndexedCollectionName ).collection( "_settings" ).post( payload );
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         String entityName1 = "unindexed1";
         Entity unindexed1 = this.app().collection( unIndexedCollectionName )
@@ -1018,7 +1018,7 @@ public class CollectionsResourceIT extends AbstractRestIT {
 
         app().collection( collectionName ).collection( "_settings" )
             .post( new Entity().chainPut( "fields", "all" ) );
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         // get collection settings, should see no region
 
@@ -1051,7 +1051,7 @@ public class CollectionsResourceIT extends AbstractRestIT {
 
         app().collection( collectionName ).collection( "_settings" )
             .post( new Entity().chainPut( REGION_SETTING, "" ) );
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         // get collection settings, should see no region
 
@@ -1091,14 +1091,14 @@ public class CollectionsResourceIT extends AbstractRestIT {
 
 
         this.app().collection("notifications/"+ UUIDUtils.newTimeUUID()).post(payload );
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         Collection user2 = this.app().collection("notifications").get();
 
         assertEquals(1,user2.getNumOfEntities());
 
         this.app().collection("notifications/"+ UUIDUtils.newTimeUUID()).put(null,payload );
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         user2 = this.app().collection("notifications").get();
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/DuplicateNameIT.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/DuplicateNameIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/DuplicateNameIT.java
index 0776705..7e1c5a5 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/DuplicateNameIT.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/DuplicateNameIT.java
@@ -41,7 +41,7 @@ public class DuplicateNameIT extends AbstractRestIT {
         entity.put("name", "enzo");
         //Create an entity named "enzo" in the "things" collection
         entity = this.app().collection(collectionName).post(entity);
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
         try {
             // Try to create a second entity in "things" with the name "enzo".
             this.app().collection(collectionName).post(entity);

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/activities/ActivityResourceIT.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/activities/ActivityResourceIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/activities/ActivityResourceIT.java
index c7f39b2..a9f5fee 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/activities/ActivityResourceIT.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/activities/ActivityResourceIT.java
@@ -59,7 +59,7 @@ public class ActivityResourceIT extends AbstractRestIT {
         this.activityDesc = "testActivity" ;
         this.activity = new ActivityEntity().putActor(current).chainPut("title", activityTitle).chainPut("content", activityDesc).chainPut("category", "testCategory").chainPut("verb", "POST");
         this.groupActivityResource = groupsResource.entity(entity).activities();
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
     }
 
 
@@ -87,7 +87,7 @@ public class ActivityResourceIT extends AbstractRestIT {
         {
             throw e;
         }
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         Collection results = groupActivityResource.get();
 
@@ -111,7 +111,7 @@ public class ActivityResourceIT extends AbstractRestIT {
         usersResource.entity(current).activities().post(activity);
 
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         Collection results = usersResource.entity(current).activities().get();
 
@@ -136,7 +136,7 @@ public class ActivityResourceIT extends AbstractRestIT {
 
         this.app().collection("activities").post(activity);
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         Collection results = this.app().collection("activities").get();
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/activities/PutTest.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/activities/PutTest.java b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/activities/PutTest.java
index d61d363..4ba8977 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/activities/PutTest.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/activities/PutTest.java
@@ -17,7 +17,6 @@
 package org.apache.usergrid.rest.applications.collection.activities;
 
 
-import com.fasterxml.jackson.databind.JsonNode;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
@@ -28,7 +27,6 @@ import org.apache.usergrid.rest.test.resource.AbstractRestIT;
 import org.apache.usergrid.rest.test.resource.endpoints.CollectionEndpoint;
 import org.apache.usergrid.rest.test.resource.model.Collection;
 import org.apache.usergrid.rest.test.resource.model.Entity;
-import org.junit.Rule;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -60,7 +58,7 @@ public class PutTest extends AbstractRestIT {
             Entity activity = activities.post(new Entity(props));
         }
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         String query = "select * ";
 
@@ -72,7 +70,7 @@ public class PutTest extends AbstractRestIT {
         props.put( "actor", newActor );
         Entity activity = activities.post(new Entity(props));
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         collection = activities.get(  );
         assertEquals( 6, collection.getResponse().getEntities().size() );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/devices/DevicesResourceIT.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/devices/DevicesResourceIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/devices/DevicesResourceIT.java
index 67cf19f..b73bcbd 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/devices/DevicesResourceIT.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/devices/DevicesResourceIT.java
@@ -17,11 +17,8 @@
 package org.apache.usergrid.rest.applications.collection.devices;
 
 
-import java.util.HashMap;
-import java.util.Map;
 import java.util.UUID;
 
-import com.fasterxml.jackson.databind.JsonNode;
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 import org.apache.usergrid.rest.test.resource.AbstractRestIT;
 import org.apache.usergrid.rest.test.resource.endpoints.CollectionEndpoint;
@@ -35,7 +32,6 @@ import java.io.IOException;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.fail;
-import org.junit.Ignore;
 
 import javax.ws.rs.ClientErrorException;
 
@@ -51,7 +47,7 @@ public class DevicesResourceIT extends AbstractRestIT {
 
         CollectionEndpoint devicesResource  =this.app().collection("devices");
         Entity entity = devicesResource.entity(uuid).put(payload);
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         // create
         assertNotNull( entity );
@@ -62,7 +58,7 @@ public class DevicesResourceIT extends AbstractRestIT {
         ApiResponse deleteResponse =devicesResource.entity(uuid).delete();
         assertNotNull(deleteResponse.getEntities().get(0));
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         // check deleted
         try {
@@ -72,7 +68,7 @@ public class DevicesResourceIT extends AbstractRestIT {
         catch ( ClientErrorException e ) {
             assertEquals( 404, e.getResponse().getStatus() );
         }
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         // create again
         entity = devicesResource.entity(uuid).put(payload);
@@ -80,7 +76,7 @@ public class DevicesResourceIT extends AbstractRestIT {
 
         assertNotNull( entity );
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         // check existence
         entity = devicesResource.entity(uuid).get();

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/groups/GroupResourceIT.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/groups/GroupResourceIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/groups/GroupResourceIT.java
index 769852d..b94050b 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/groups/GroupResourceIT.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/groups/GroupResourceIT.java
@@ -21,7 +21,6 @@ import com.fasterxml.jackson.databind.JsonNode;
 import org.apache.usergrid.rest.test.resource.AbstractRestIT;
 import org.apache.usergrid.rest.test.resource.model.Collection;
 import org.apache.usergrid.rest.test.resource.model.Entity;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import javax.ws.rs.ClientErrorException;
@@ -58,7 +57,7 @@ public class GroupResourceIT extends AbstractRestIT {
         Entity entity = this.app().collection("groups").post(payload);
         assertEquals(entity.get("name"), groupName);
         assertEquals(entity.get("path"), groupPath);
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
         return entity;
     }
 
@@ -74,7 +73,7 @@ public class GroupResourceIT extends AbstractRestIT {
         Entity entity = this.app().collection("roles").post(payload);
         assertEquals(entity.get("name"), roleName);
         assertEquals(entity.get("title"), roleTitle);
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
         return entity;
     }
 
@@ -91,7 +90,7 @@ public class GroupResourceIT extends AbstractRestIT {
         Entity entity = this.app().collection("users").post(payload);
         assertEquals(entity.get("username"), username);
         assertEquals(entity.get("email"), email);
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
         return entity;
     }
 
@@ -180,7 +179,7 @@ public class GroupResourceIT extends AbstractRestIT {
         group.put("path", newGroupPath);
         Entity groupResponse = this.app().collection("groups").entity(group).put(group);
         assertEquals(groupResponse.get("path"), newGroupPath);
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         //4. do a GET to verify the property really was set
         groupResponseGET = this.app().collection("groups").entity(group).get();
@@ -223,7 +222,7 @@ public class GroupResourceIT extends AbstractRestIT {
         // 3. add the user to the group
         Entity response = this.app().collection("users").entity(user).connection().collection("groups").entity(group).post();
         assertEquals(response.get("name"), groupName);
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         // 4. make sure the user is in the group
         Collection collection = this.app().collection("groups").entity(group).connection().collection("users").get();
@@ -237,7 +236,7 @@ public class GroupResourceIT extends AbstractRestIT {
 
         //6. remove the user from the group
         this.app().collection("group").entity(group).connection().collection("users").entity(user).delete();
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         //6. make sure the connection no longer exists
         collection = this.app().collection("group").entity(group).connection().collection("users").get();
@@ -266,12 +265,12 @@ public class GroupResourceIT extends AbstractRestIT {
         String roleName = "tester";
         String roleTitle = "tester";
         Entity role = this.createRole(roleName, roleTitle);
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         //3. add role to the group
         Entity response = this.app().collection("roles").entity(role).connection().collection("groups").entity(group).post();
         assertEquals(response.get("name"), groupName);
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         //4. make sure the role is in the group
         Collection collection = this.app().collection("groups").entity(group).connection().collection("roles").get();
@@ -280,7 +279,7 @@ public class GroupResourceIT extends AbstractRestIT {
 
         //5. remove Role from the group (should only delete the connection)
         this.app().collection("groups").entity(group).connection().collection("roles").entity(role).delete();
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         //6. make sure the connection no longer exists
         collection = this.app().collection("groups").entity(group).connection().collection("roles").get();
@@ -294,7 +293,7 @@ public class GroupResourceIT extends AbstractRestIT {
 
         //8. delete the role
         this.app().collection("role").entity(role).delete();
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
         Thread.sleep(5000);
 
         //9. do a GET to make sure the role was deleted
@@ -359,7 +358,7 @@ public class GroupResourceIT extends AbstractRestIT {
         payload.put("name", catName);
         Entity fluffy = this.app().collection("cats").post(payload);
         assertEquals(fluffy.get("name"), catName);
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         //10. get the cat - permissions should allow this
         fluffy = this.app().collection("cats").uniqueID(catName).get();
@@ -436,7 +435,7 @@ public class GroupResourceIT extends AbstractRestIT {
 
 
         //7. get all the users in the groups
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
         Collection usersInGroup = this.app().collection("groups").uniqueID(groupName).connection("users").get();
         assertEquals(usersInGroup.getResponse().getEntityCount(), 2);
 
@@ -444,7 +443,7 @@ public class GroupResourceIT extends AbstractRestIT {
         this.app().collection("role").uniqueID("Default").delete();
         Entity data = new Entity().chainPut("name", "group1role");
         this.app().collection("roles").post(data);
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         Entity perms = new Entity();
         String permission = "get,post,put,delete:/groups/" + group.getUuid() + "/**";
@@ -452,18 +451,18 @@ public class GroupResourceIT extends AbstractRestIT {
         this.app().collection("roles").uniqueID("group1role").connection("permissions").post(perms);
         this.app().collection("roles").uniqueID("group1role").connection("users").uniqueID( user1Username ).post();
         this.app().collection("roles").uniqueID("group1role").connection("users").uniqueID( user2Username ).post();
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         //7b. everybody gets access to /activities
         perms = new Entity();
         permission = "get:/activities/**";
         perms.put("permission",permission);
         this.app().collection("roles").uniqueID("Guest").connection("permissions").post(perms);
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
         this.app().collection("roles").uniqueID("Guest").connection("users").uniqueID( user1Username ).post();
         this.app().collection("roles").uniqueID("Guest").connection("users").uniqueID( user2Username ).post();
         this.app().collection("roles").uniqueID("Guest").connection("users").uniqueID( user3Username ).post();
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
 
         //8. post an activity to the group
@@ -482,7 +481,7 @@ public class GroupResourceIT extends AbstractRestIT {
         Entity activityResponse = this.app().collection("groups")
             .uniqueID(groupName).connection("activities").post(activity);
         assertEquals(activityResponse.get("content"), content);
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         //11. log user1 in, should then be using the app user's token not the admin token
         this.getAppUserToken(user1Username, password);
@@ -565,7 +564,7 @@ public class GroupResourceIT extends AbstractRestIT {
         group.put("title", newTitle);
         Entity groupResponse = this.app().collection("groups").entity(group).put(group);
         assertEquals(groupResponse.get("title"), newTitle);
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         // update that group by giving it a new title and using UUID in URL
         String evenNewerTitle = "Even New Title";
@@ -573,6 +572,6 @@ public class GroupResourceIT extends AbstractRestIT {
         String uuid = group.getAsString("uuid");
         groupResponse = this.app().collection("groups").uniqueID(uuid).put(group);
         assertEquals(groupResponse.get("title"), evenNewerTitle);
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
     }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/paging/PagingResourceIT.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/paging/PagingResourceIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/paging/PagingResourceIT.java
index 4ca46b1..da15b2e 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/paging/PagingResourceIT.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/paging/PagingResourceIT.java
@@ -130,7 +130,7 @@ public class PagingResourceIT extends AbstractRestIT {
 
             ApiResponse response = this.app().collection( collectionName ).delete( queryParameters );
 
-            this.refreshIndex();
+            this.waitForQueueDrainAndRefreshIndex();
 
             if(validate)
                 assertEquals("Entities should have been deleted", deletePageSize,response.getEntityCount() );
@@ -268,7 +268,7 @@ public class PagingResourceIT extends AbstractRestIT {
             entityPayload.put( "name", created );
             Entity entity = new Entity( entityPayload );
             entity = this.app().collection( collectionName ).post( entity );
-            refreshIndex();
+            waitForQueueDrainAndRefreshIndex();
             if(created == 1){
                 connectedEntity = entity;
             }
@@ -277,7 +277,7 @@ public class PagingResourceIT extends AbstractRestIT {
             }
         }
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         QueryParameters qp = new QueryParameters();
         qp.setQuery("select * order by created asc");
@@ -323,7 +323,7 @@ public class PagingResourceIT extends AbstractRestIT {
             this.app().collection( collectionName ).post( entity );
         }
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         //Creates query looking for entities with the very stop.
         String query = "select * where verb = 'stop'";
@@ -454,7 +454,7 @@ public class PagingResourceIT extends AbstractRestIT {
             }
         }
 
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         return entities;
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/users/ConnectionResourceTest.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/users/ConnectionResourceTest.java b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/users/ConnectionResourceTest.java
index 6202c6a..7315cad 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/users/ConnectionResourceTest.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/users/ConnectionResourceTest.java
@@ -63,7 +63,7 @@ public class ConnectionResourceTest extends AbstractRestIT {
         Entity objectOfDesire = new Entity();
         objectOfDesire.put( "codingmunchies", "doritoes" );
         objectOfDesire = this.app().collection( "snacks" ).post( objectOfDesire );
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         Entity toddWant = this.app().collection( "users" ).entity( todd ).collection( "likes" ).collection( "snacks" )
                               .entity( objectOfDesire ).post();
@@ -93,11 +93,11 @@ public class ConnectionResourceTest extends AbstractRestIT {
         thing2.put( "name", "thing2" );
         thing2 = this.app().collection( "things" ).post( thing2 );
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
         //create the connection: thing1 likes thing2
         this.app().collection( "things" ).entity( thing1 )
             .connection("likes").collection( "things" ).entity( thing2 ).post();
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         //test we have the "likes" in our connection meta data response
         thing1 = this.app().collection( "things" ).entity( thing1 ).get();
@@ -150,14 +150,14 @@ public class ConnectionResourceTest extends AbstractRestIT {
         thing2.put( "name", "thing2" );
         thing2 = this.app().collection( "things" ).post( thing2 );
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
         //create the connection: thing1 likes thing2
         this.app().collection( "things" ).entity( thing1 )
             .connection("likes").collection( "things" ).entity( thing2 ).post();
         //delete thing2
         this.app().collection( "things" ).entity( thing2 ).delete();
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         try {
             //attempt to retrieve thing1
@@ -185,14 +185,14 @@ public class ConnectionResourceTest extends AbstractRestIT {
         thing2.put( "name", "thing2" );
         thing2 = this.app().collection( "things" ).post( thing2 );
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
         //create the connection: thing1 likes thing2
         this.app().collection( "things" ).entity( thing1 )
             .connection("likes").collection( "things" ).entity( thing2 ).post();
         //delete thing1
         this.app().collection( "things" ).entity( thing1 ).delete();
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         try {
             //attempt to retrieve thing1
@@ -236,7 +236,7 @@ public class ConnectionResourceTest extends AbstractRestIT {
         //connect thing1 -> thing3
         connectionEndpoint.entity( thing3 ).post();
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         //now do a GET, we should see thing2 then thing3
 
@@ -257,7 +257,7 @@ public class ConnectionResourceTest extends AbstractRestIT {
         //now re-post thing 2 it should appear second
         connectionEndpoint.entity( thing2 ).post();
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
 
         final ApiResponse order2 = connectionEndpoint.get().getResponse();
@@ -304,7 +304,7 @@ public class ConnectionResourceTest extends AbstractRestIT {
         //connect thing1 -> thing3
         connectionEndpoint.entity( thing3 ).put( thing3 );
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         //now do a GET, we should see thing2 then thing3
 
@@ -325,7 +325,7 @@ public class ConnectionResourceTest extends AbstractRestIT {
         //now re-post thing 2 it should appear second
         connectionEndpoint.entity( thing2 ).put( thing2 );
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         final ApiResponse order2 = connectionEndpoint.get().getResponse();
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/users/OwnershipResourceIT.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/users/OwnershipResourceIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/users/OwnershipResourceIT.java
index cfd08e5..3393582 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/users/OwnershipResourceIT.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/users/OwnershipResourceIT.java
@@ -17,19 +17,15 @@
 package org.apache.usergrid.rest.applications.collection.users;
 
 
-import com.fasterxml.jackson.databind.JsonNode;
 import java.io.IOException;
 
 import org.apache.usergrid.rest.test.resource.AbstractRestIT;
 import org.apache.usergrid.rest.test.resource.endpoints.CollectionEndpoint;
 import org.apache.usergrid.rest.test.resource.model.*;
 import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
 
 
-import org.apache.usergrid.utils.MapUtils;
-
 import javax.ws.rs.ClientErrorException;
 
 import static org.junit.Assert.assertEquals;
@@ -65,7 +61,7 @@ public class OwnershipResourceIT extends AbstractRestIT {
         user1 = new User(this.usersResource.post(user1));
         user2 = new User(this.usersResource.post(user2));
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
     }
 
 
@@ -95,7 +91,7 @@ public class OwnershipResourceIT extends AbstractRestIT {
         //Revoke the user1 token
         usersResource.entity(user1).connection("revoketokens").post(new Entity().chainPut("token", token.getAccessToken()));
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         //See if we can still access the me entity after revoking its token
         try {
@@ -127,7 +123,7 @@ public class OwnershipResourceIT extends AbstractRestIT {
         // create device 1 on user1 devices
         usersResource.entity("me").collection("devices")
                .post(new Entity( ).chainPut("name", "device1").chainPut("number", "5551112222"));
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         //Clear the current user token
         this.app().token().clearToken();
@@ -137,7 +133,7 @@ public class OwnershipResourceIT extends AbstractRestIT {
         usersResource.entity("me").collection("devices")
                 .post(new Entity( ).chainPut("name", "device2").chainPut("number", "5552223333"));
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         //Check that we can get back device1 on user1
         token = this.app().token().post(new Token(user1.getUsername(),"password"));
@@ -236,13 +232,13 @@ public class OwnershipResourceIT extends AbstractRestIT {
         // create a 4peaks restaurant
         Entity data = this.app().collection("restaurants").post(new Entity().chainPut("name", "4peaks"));
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         //Create a restaurant and link it to user1/me
         Entity fourPeaksData = usersResource.entity("me")
                 .connection("likes").collection( "restaurants" ).entity( "4peaks" ).post();
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         // anonymous user
         this.app().token().clearToken();
@@ -252,11 +248,11 @@ public class OwnershipResourceIT extends AbstractRestIT {
 
         data = this.app().collection("restaurants")
                       .post(new Entity().chainPut("name", "arrogantbutcher"));
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         data = usersResource.entity("me").connection( "likes" ).collection( "restaurants" )
                       .entity( "arrogantbutcher" ).post();
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         String arrogantButcherId = data.getUuid().toString();
 
@@ -390,7 +386,7 @@ public class OwnershipResourceIT extends AbstractRestIT {
         //Sets up the cities collection with the city tempe
         Entity city = this.app().collection("cities").post(new Entity().chainPut("name", "tempe"));
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         // create a 4peaks restaurant that is connected by a like to tempe.
         Entity data = this.app().collection("cities").entity( "tempe" ).connection( "likes" )
@@ -410,7 +406,7 @@ public class OwnershipResourceIT extends AbstractRestIT {
         CollectionEndpoint likeRestaurants =
                 this.app().collection("cities").entity( "tempe" ).connection( "likes" );
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         // check we can get the resturant entities back via uuid without a collection name
         data = likeRestaurants.entity( peaksId ).get();

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/users/PermissionsResourceIT.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/users/PermissionsResourceIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/users/PermissionsResourceIT.java
index aff952b..2dddcf6 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/users/PermissionsResourceIT.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/users/PermissionsResourceIT.java
@@ -66,7 +66,7 @@ public class PermissionsResourceIT extends AbstractRestIT {
 
         user = new User(USER,USER,USER+"@apigee.com","password");
         user = new User( this.app().collection("users").post(user));
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
     }
 
 
@@ -86,13 +86,13 @@ public class PermissionsResourceIT extends AbstractRestIT {
 
         assertEquals( ROLE, node.get("name").toString() );
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         //Post the user with a specific role into the users collection
         node = this.app().collection("roles").entity(node).collection("users").entity(USER).post();
         assertNull( node.get( "error" ) );
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         // now check the user has the role
         node =  this.app().collection("users").entity(USER).collection("roles").entity(ROLE).get();
@@ -104,7 +104,7 @@ public class PermissionsResourceIT extends AbstractRestIT {
         // now delete the role
         this.app().collection("users").entity(USER).collection("roles").entity(ROLE).delete();
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         // check if the role was deleted
 
@@ -136,14 +136,14 @@ public class PermissionsResourceIT extends AbstractRestIT {
 
         assertNull( node.get( "error" ) );
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         //Create a user that is in the group.
         node = this.app().collection("groups").entity(groupPath).collection("users").entity(user).post();
 
         assertNull( node.get( "error" ) );
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         //Get the user and make sure that they are part of the group
         Collection groups = this.app().collection("users").entity(user).collection("groups").get();
@@ -157,7 +157,7 @@ public class PermissionsResourceIT extends AbstractRestIT {
 
         assertNull( response.getError() );
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         //Check that the user no longer exists in the group
         int status = 0;
@@ -193,7 +193,7 @@ public class PermissionsResourceIT extends AbstractRestIT {
 
         assertNull( entity.getError() );
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         // now try to add permission as the user, this should work
         addPermission( "usercreatedrole", "get,put,post:/foo/**" );
@@ -247,13 +247,13 @@ public class PermissionsResourceIT extends AbstractRestIT {
 
         assertNull( node.getError() );
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         // delete the default role to test permissions later
         ApiResponse response = this.app().collection("roles").entity("default").delete();
 
         assertNull( response.getError() );
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         // Grants a permission to GET, POST, and PUT the reviews url for the reviewer role
         addPermission( "reviewer", "get,put,post:/reviews/**" );
@@ -266,22 +266,22 @@ public class PermissionsResourceIT extends AbstractRestIT {
 
         this.app().collection("groups").post(group);
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         // Adds the reviewer to the reviewerGroup
         this.app().collection("groups").entity("reviewergroup").collection("roles").entity("reviewer").post();
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         // Adds reviewer2 user to the reviewergroup
         this.app().collection("users").entity("reviewer2").collection("groups").entity("reviewergroup").post();
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         // Adds reviewer1 to the reviewer role
         this.app().collection("users").entity("reviewer1").collection("roles").entity("reviewer").post();
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         // Set the current context to reviewer1
         this.app().token().post(new Token("reviewer1","password"));
@@ -295,7 +295,7 @@ public class PermissionsResourceIT extends AbstractRestIT {
             .chainPut ("rating", "4").chainPut( "name", "4peaks").chainPut("review", "Huge beer selection" );
         this.app().collection("reviews").post(review);
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         // get the reviews and assert they were created
         QueryParameters queryParameters = new QueryParameters();
@@ -330,7 +330,7 @@ public class PermissionsResourceIT extends AbstractRestIT {
 
         assertEquals( Response.Status.UNAUTHORIZED.getStatusCode(), status );
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         //TODO: maybe make this into two different tests?
 
@@ -346,7 +346,7 @@ public class PermissionsResourceIT extends AbstractRestIT {
             .chainPut( "rating", "4" ).chainPut("name", "currycorner").chainPut( "review", "Authentic" );
         this.app().collection("reviews").post(review);
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         // get all reviews as reviewer2
         queryParameters = new QueryParameters();
@@ -372,7 +372,7 @@ public class PermissionsResourceIT extends AbstractRestIT {
 
         assertEquals( Response.Status.UNAUTHORIZED.getStatusCode(), status );
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         status = 0;
 
@@ -409,7 +409,7 @@ public class PermissionsResourceIT extends AbstractRestIT {
         Entity data = new Entity().chainPut("name", "reviewer");
         this.app().collection("roles").post(data);
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         // allow access to reviews excluding delete
         addPermission( "reviewer",
@@ -433,13 +433,13 @@ public class PermissionsResourceIT extends AbstractRestIT {
                         "wildcardpermusertwo@apigee.com" );
         assertNotNull( userTwoId );
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         //Add user1 to the reviewer role
         this.app().collection("users").entity(userOneId).collection("roles").entity("reviewer").post();
 
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         //Add a book to the books collection
         Entity book = new Entity().chainPut( "title", "Ready Player One" ).chainPut("author", "Earnest Cline");
@@ -449,7 +449,7 @@ public class PermissionsResourceIT extends AbstractRestIT {
         assertEquals( "Ready Player One", book.get("title").toString() );
         String bookId = book.get("uuid").toString();
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         //Switch the contex to be that of user1
         this.app().token().post(new Token("wildcardpermuserone","password"));
@@ -461,7 +461,7 @@ public class PermissionsResourceIT extends AbstractRestIT {
         review = this.app().collection("reviews").post(review);
         String reviewId = review.get("uuid").toString();
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         // POST https://api.usergrid.com/my-org/my-app/users/me/wrote/review/${reviewId}
         this.app().collection("users").entity("me").connection("wrote").collection("review").entity(reviewId).post();
@@ -469,13 +469,13 @@ public class PermissionsResourceIT extends AbstractRestIT {
         // POST https://api.usergrid.com/my-org/my-app/users/me/reviewed/review/${reviewId}
         this.app().collection("users").entity("me").connection("reviewed").collection("books").entity(bookId).post();
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         // POST https://api.usergrid.com/my-org/my-app/books/${bookId}/review/${reviewId}
         this.app().collection("books").entity(bookId).collection("review").entity(reviewId).post();
 
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         // now try to post the same thing to books to verify as userOne does not have correct permissions
         int status = 0;
@@ -522,7 +522,7 @@ public class PermissionsResourceIT extends AbstractRestIT {
 
         //allow patients to add doctors as their followers
         addPermission(  "patient", "delete,post:/users/*/following/users/${user}" );
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         // create examplepatient
         UUID patientId =  createRoleUser( "examplepatient",  "examplepatient@apigee.com" );
@@ -531,12 +531,12 @@ public class PermissionsResourceIT extends AbstractRestIT {
         // create exampledoctor
         UUID doctorId = createRoleUser( "exampledoctor",  "exampledoctor@apigee.com" );
         assertNotNull( doctorId );
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
         // assign examplepatient the patient role
         this.app().collection("users").entity(patientId).collection("roles").entity("patient").post();
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
         this.app().token().post(new Token("examplepatient","password"));
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
         //not working yet, used to be ignored
         //        this.app().collection("users").entity("exampledoctor").connection("following")
         // .collection("users").entity("examplepatient").post();

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/users/RetrieveUsersTest.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/users/RetrieveUsersTest.java b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/users/RetrieveUsersTest.java
index d5f7163..ddb1557 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/users/RetrieveUsersTest.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/users/RetrieveUsersTest.java
@@ -17,19 +17,15 @@
 package org.apache.usergrid.rest.applications.collection.users;
 
 
-import java.util.HashMap;
 import java.util.Map;
 
-import com.fasterxml.jackson.databind.JsonNode;
 import java.io.IOException;
 
 import org.apache.usergrid.rest.test.resource.AbstractRestIT;
 import org.apache.usergrid.rest.test.resource.endpoints.CollectionEndpoint;
-import org.apache.usergrid.rest.test.resource.endpoints.EntityEndpoint;
 import org.apache.usergrid.rest.test.resource.model.Entity;
 import org.apache.usergrid.rest.test.resource.model.QueryParameters;
 import org.junit.Assert;
-import org.junit.Rule;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -55,7 +51,7 @@ public class RetrieveUsersTest extends AbstractRestIT {
 
 
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         String query = "select *";
         String incorrectQuery = "select * where username = 'Alica'";
@@ -72,7 +68,7 @@ public class RetrieveUsersTest extends AbstractRestIT {
         props.put( "username", "Nina" );
 
         Entity entity = users.post(props);
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         Map<String,Object> metadata = (Map)entity.get( "metadata" );
         Map<String,Object> sets = (Map)metadata.get( "sets" );