You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by mr...@apache.org on 2017/04/02 23:23:18 UTC

[1/3] usergrid git commit: Switch DISTRIBUTED database queueing to default not cache in memory as the in memory impl causes duplicate messgae processing quite often at the moment.

Repository: usergrid
Updated Branches:
  refs/heads/master 8b63aae7d -> d3e988bcb


http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/users/UserResourceIT.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/users/UserResourceIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/users/UserResourceIT.java
index f8cb9d4e..af87ca5 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/users/UserResourceIT.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/users/UserResourceIT.java
@@ -75,7 +75,7 @@ public class UserResourceIT extends AbstractRestIT {
         usersResource = this.app().collection("users");
         userResource = this.app().collection("user");
 
-        clientSetup.refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
     }
 
     @Test
@@ -249,14 +249,14 @@ public class UserResourceIT extends AbstractRestIT {
         // same as above, but with actor partially filled out
 
         Entity entity = usersResource.entity(userId.toString()).activities().post(activity);
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         UUID firstActivityId = entity.getUuid();
 
         activity = new ActivityEntity("rod@rodsimpson.com", "POST", "activity 2");
         entity = usersResource.entity(userId.toString()).activities().post(activity);
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         UUID secondActivityId = entity.getUuid();
 
@@ -284,7 +284,7 @@ public class UserResourceIT extends AbstractRestIT {
         map.put("email", email);
 
         Entity userEntity = usersResource.post(new Entity(map));
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         // get the user with username property that has an email value
         Entity testUser = usersResource.entity(username).get();
@@ -315,7 +315,7 @@ public class UserResourceIT extends AbstractRestIT {
         map.put("email", email);
 
         usersResource.post(new Entity(map));
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         // get the user with email property value
         // get the user with username property that has an email value
@@ -339,7 +339,7 @@ public class UserResourceIT extends AbstractRestIT {
         map.put("email", email);
 
         usersResource.post(new Entity(map));
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         // get the user with email property value
         // get the user with username property that has an email value
@@ -409,7 +409,7 @@ public class UserResourceIT extends AbstractRestIT {
         Entity entity = usersResource.post(user);
         UUID createdId = entity.getUuid();
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
         Collection results = usersResource.get(new QueryParameters().setQuery(String.format("name = '%s'", name)));
         entity = new User(results.getResponse().getEntities().get(0));
         assertEquals(createdId, entity.getUuid());
@@ -429,13 +429,13 @@ public class UserResourceIT extends AbstractRestIT {
 
         UUID createdId = entity.getUuid();
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         Entity newEntity = usersResource.entity(createdId.toString()).get();
 
         userResource.entity(newEntity).delete();
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         Collection results = usersResource.get(
             new QueryParameters().setQuery(String.format("username = '%s'", username)));
@@ -460,7 +460,7 @@ public class UserResourceIT extends AbstractRestIT {
         User entity = new User(username, name, email, "password");
 
         entity = new User(usersResource.post(entity));
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         UUID firstCreatedId = entity.getUuid();
         username = "username2";
@@ -470,7 +470,7 @@ public class UserResourceIT extends AbstractRestIT {
         entity = new User(username, name, email, "password");
 
         entity = new User(usersResource.post(entity));
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         UUID secondCreatedId = entity.getUuid();
 
@@ -484,7 +484,7 @@ public class UserResourceIT extends AbstractRestIT {
 
         assertEquals(secondCreatedId.toString(), conn1.getUuid().toString());
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
 
         Entity conn2 = usersResource.entity(
@@ -492,7 +492,7 @@ public class UserResourceIT extends AbstractRestIT {
 
         assertEquals(secondCreatedId.toString(), conn2.getUuid().toString());
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         Collection conn1Connections = usersResource.entity(firstCreatedId.toString()).connection("conn1").get();
 
@@ -542,7 +542,7 @@ public class UserResourceIT extends AbstractRestIT {
 
         // now create a connection of "likes" between the first user and the
         // second using pluralized form
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         // named entity in collection name
         Entity conn1 = usersResource.entity(firstCreatedId.toString()).connection("conn1", "users")
@@ -595,6 +595,7 @@ public class UserResourceIT extends AbstractRestIT {
         role = new Entity();
         role.put("name", "connectionQuerybyEmail2");
         role = this.app().collection("roles").post(role);
+        waitForQueueDrainAndRefreshIndex();
 
 
         UUID roleId2 = role.getUuid();
@@ -605,24 +606,24 @@ public class UserResourceIT extends AbstractRestIT {
         perms.put("permission", "get:/stuff/**");
         Entity perms2 = this.app().collection("roles").entity(roleId2.toString()).connection("permissions")
             .post(new Entity(perms));
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
+
         //connect the entities where role is the root
         Entity perms3 = this.app().collection("roles").entity(roleId1.toString()).connection("users")
             .entity(userId.toString()).post();
-
-        // now create a connection of "likes" between the first user and the
-        // second using pluralized form
-
         assertEquals(userId.toString(), perms3.getUuid().toString());
+        waitForQueueDrainAndRefreshIndex();
 
 
+        // now create a connection of "likes" between the first user and the
+        // second using pluralized form
         //connect the second role
 
         Entity perms4 = this.app().collection("roles").entity(roleId2).connection("users").entity(userId).post();
-
         assertEquals(userId.toString(), perms4.getUuid().toString());
+        waitForQueueDrainAndRefreshIndex();
+
 
-        refreshIndex();
         //query the second role, it should work
         Collection userRoles = this.app().collection("roles").entity(roleId2).connection("users")
             .get(new QueryParameters().setQuery("select%20*%20where%20username%20=%20'" + email + "'"));
@@ -678,7 +679,7 @@ public class UserResourceIT extends AbstractRestIT {
         Entity pizzaEntity = this.app().collection("pizzas").post(pizza);
 
         UUID secondCreatedId = pizzaEntity.getUuid();
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         // now create a connection of "likes" between the first user and the
         // second using pluralized form
@@ -708,7 +709,7 @@ public class UserResourceIT extends AbstractRestIT {
 
         Entity userEntity = usersResource.post(entity);
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
         // attempt to log in
         Token token = this.app().token().post(new Token(username, "password"));
 
@@ -728,7 +729,7 @@ public class UserResourceIT extends AbstractRestIT {
         userEntity = usersResource.entity(username).put(userEntity);
 
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
         // now see if we've updated
 
 
@@ -782,7 +783,7 @@ public class UserResourceIT extends AbstractRestIT {
                     .chainPut("pin", "1234");
 
         usersResource.post(entity);
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         Collection response = usersResource.get();
         // disable the user
@@ -798,7 +799,7 @@ public class UserResourceIT extends AbstractRestIT {
     public void test_PUT_password_fail() {
         Entity entity = usersResource.post(new User("edanuff", "edanuff", "edanuff@email.com", "sesame"));
         this.app().token().post(new Token("edanuff", "sesame"));
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
         boolean fail = false;
         try {
             Entity changeResponse = usersResource.entity("edanuff").collection("password")
@@ -829,17 +830,17 @@ public class UserResourceIT extends AbstractRestIT {
     @Test
     public void test_PUT_password_ok() {
         Entity entity = usersResource.post(new User("edanuff", "edanuff", "edanuff@email.com", "sesame"));
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
         usersResource.entity(entity).collection("password").post(new ChangePasswordEntity("sesame", "sesame1"));
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
         this.app().token().post(new Token("edanuff", "sesame1"));
 
         // if this was successful, we need to re-set the password for other
         // tests
         Entity changeResponse = usersResource.entity("edanuff").collection("password")
             .post(new ChangePasswordEntity("sesame1", "sesame"));
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
         assertNotNull(changeResponse);
 
     }
@@ -849,14 +850,14 @@ public class UserResourceIT extends AbstractRestIT {
     public void setUserPasswordAsAdmin() throws IOException {
         usersResource.post(new User("edanuff", "edanuff", "edanuff@email.com", "sesame"));
         String newPassword = "foofoo";
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         // change the password as admin. The old password isn't required
         Entity node = usersResource.entity("edanuff").connection("password")
             .post(new ChangePasswordEntity(newPassword));
         assertNotNull(node);
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
         Token response = this.app().token().post(new Token("edanuff", newPassword));
         assertNotNull(response);
     }
@@ -899,7 +900,7 @@ public class UserResourceIT extends AbstractRestIT {
     public void testChangePassordToInvalidValue() {
 
         Entity entity = usersResource.post(new User("edanuff", "edanuff", "edanuff@email.com", "sesame"));
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         try {
             usersResource.entity(entity).collection("password").post(new ChangePasswordEntity("sesame", "abc"));
@@ -930,12 +931,12 @@ public class UserResourceIT extends AbstractRestIT {
         this.app().collection("roles").post(role);
         // check it
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
         // add Role
 
         role = usersResource.entity(createdId).collection("roles").entity(roleName).post();
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
         // check it
         assertNotNull(role);
         assertNotNull(role.get("name"));
@@ -966,7 +967,7 @@ public class UserResourceIT extends AbstractRestIT {
     public void revokeToken() throws Exception {
 
         this.app().collection("users").post(new User("edanuff", "edanuff", "edanuff@email.com", "sesame"));
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
         Token token1 = this.app().token().post(new Token("edanuff", "sesame"));
         Token token2 = this.app().token().post(new Token("edanuff", "sesame"));
 
@@ -984,7 +985,7 @@ public class UserResourceIT extends AbstractRestIT {
         this.app().token().setToken(adminToken);
 
         usersResource.entity("edanuff").connection("revoketokens").post(new Entity().chainPut("token", token1));
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
         // the tokens shouldn't work
 
         int status = 0;
@@ -1032,7 +1033,7 @@ public class UserResourceIT extends AbstractRestIT {
         // now revoke the tokens
         this.app().token().setToken(adminToken);
         usersResource.entity("edanuff").connection("revoketokens").post();
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         // the token3 shouldn't work
 
@@ -1070,7 +1071,7 @@ public class UserResourceIT extends AbstractRestIT {
         usersResource.post(new User("test_1", "Test1 User", "test_1@test.com", "test123")); // client.setApiUrl(apiUrl);
         usersResource.post(new User("test_2", "Test2 User", "test_2@test.com", "test123")); // client.setApiUrl(apiUrl);
         usersResource.post(new User("test_3", "Test3 User", "test_3@test.com", "test123")); // client.setApiUrl(apiUrl);
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         //Entity appInfo = this.app().get().getResponse().getEntities().get(0);
 
@@ -1080,7 +1081,7 @@ public class UserResourceIT extends AbstractRestIT {
 
         assertNotNull(token.getAccessToken());
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         int status = 0;
 
@@ -1123,7 +1124,7 @@ public class UserResourceIT extends AbstractRestIT {
         Entity entityConn = usersResource.entity(userId).connection("deactivate").post(new Entity());
 
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         try {
             this.app().token().post(new Token("test_1", "test123"));
@@ -1141,7 +1142,7 @@ public class UserResourceIT extends AbstractRestIT {
         String randomName = "user1_" + UUIDUtils.newTimeUUID().toString();
         User user = new User(randomName, randomName, randomName + "@apigee.com", "password");
         usersResource.post(user);
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         // should update a field
         Entity response = usersResource.entity(randomName).get();
@@ -1155,7 +1156,7 @@ public class UserResourceIT extends AbstractRestIT {
 
         response = usersResource.post(user2);
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         Entity response2 = usersResource.entity(randomName).get();
 
@@ -1208,7 +1209,7 @@ public class UserResourceIT extends AbstractRestIT {
 
         assertNotNull(userId);
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         ql = "uuid = " + userId;
 
@@ -1223,7 +1224,7 @@ public class UserResourceIT extends AbstractRestIT {
     public void testCredentialsTransfer() throws Exception {
 
         usersResource.post(new User("test_1", "Test1 User", "test_1@test.com", "test123")); // client.setApiUrl(apiUrl);
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         //Entity appInfo = this.app().get().getResponse().getEntities().get(0);
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/applications/events/EventsResourceIT.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/events/EventsResourceIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/events/EventsResourceIT.java
index 965105b..315f1f5 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/events/EventsResourceIT.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/events/EventsResourceIT.java
@@ -57,7 +57,7 @@ public class EventsResourceIT extends AbstractRestIT {
         assertNotNull(node.getEntities());
         String advertising = node.getEntity().get("uuid").toString();
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         payload = new LinkedHashMap<String, Object>();
         payload.put( "timestamp", 0 );
@@ -74,7 +74,7 @@ public class EventsResourceIT extends AbstractRestIT {
         assertNotNull(node.getEntities());
         String sales = node.getEntity().get("uuid").toString();
 
-        refreshIndex( );
+        waitForQueueDrainAndRefreshIndex( );
 
         payload = new LinkedHashMap<String, Object>();
         payload.put( "timestamp", 0 );
@@ -91,7 +91,7 @@ public class EventsResourceIT extends AbstractRestIT {
         assertNotNull(node.getEntities());
         String marketing = node.getEntity().get( "uuid" ).toString();
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         String lastId = null;
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/applications/queries/BasicGeoTests.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/queries/BasicGeoTests.java b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/queries/BasicGeoTests.java
index f47fba0..74ad38b 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/queries/BasicGeoTests.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/queries/BasicGeoTests.java
@@ -109,7 +109,7 @@ public class BasicGeoTests extends AbstractRestIT {
         assertEquals( lat.toString(), entity.getMap("location").get("latitude").toString() );
         assertEquals( lon.toString(), entity.getMap("location").get("longitude").toString() );
 
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         //2. read back that entity make sure it is accurate
         /*
@@ -144,7 +144,7 @@ public class BasicGeoTests extends AbstractRestIT {
         assertEquals( newLon.toString(), entity.get( "location" ).get("longitude").asText() );
   */
 
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         //4. read back the updated entity, make sure it is accurate
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/applications/queries/GeoPagingTest.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/queries/GeoPagingTest.java b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/queries/GeoPagingTest.java
index 91ccf38..9a1cb3c 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/queries/GeoPagingTest.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/queries/GeoPagingTest.java
@@ -91,7 +91,7 @@ public class GeoPagingTest extends AbstractRestIT {
         }
 
     }
-    this.refreshIndex();
+    this.waitForQueueDrainAndRefreshIndex();
     // 2. Query the groups from a nearby location, restricting the search
     //    by creation time to a single entity where created[i-1] < created[i] < created[i+1]
       //since this geo location is contained by an actor it needs to be actor.location.
@@ -150,7 +150,7 @@ public class GeoPagingTest extends AbstractRestIT {
         .map("latitude", -33.889058)
         .map("longitude", 151.124024));
     this.app().collection(collectionType).post(props2);
-    this.refreshIndex();
+    this.waitForQueueDrainAndRefreshIndex();
 
     Collection collection = this.app().collection(collectionType).get();
     assertEquals("Should return both entities", 2, collection.getResponse().getEntityCount());
@@ -182,7 +182,7 @@ public class GeoPagingTest extends AbstractRestIT {
       cats[i] = cat;
       this.app().collection("cats").post(cat);
     }
-    this.refreshIndex();
+    this.waitForQueueDrainAndRefreshIndex();
 
     QueryParameters params = new QueryParameters();
     for (int consistent = 0; consistent < 20; consistent++) {
@@ -229,7 +229,7 @@ public class GeoPagingTest extends AbstractRestIT {
             cats[i] = cat;
             this.app().collection("cats").post(cat);
         }
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         final QueryParameters params = new QueryParameters();
         for (int consistent = 0; consistent < 20; consistent++) {
@@ -279,6 +279,7 @@ public class GeoPagingTest extends AbstractRestIT {
         .map("latitude", -33.746369)
         .map("longitude", 150.952183));
     this.app().collection(collectionType).post(props);
+    this.waitForQueueDrainAndRefreshIndex();
 
     Entity props2 = new Entity();
     props2.put("name", "usergrid2");
@@ -286,7 +287,7 @@ public class GeoPagingTest extends AbstractRestIT {
         .map("latitude", -33.889058)
         .map("longitude", 151.124024));
     this.app().collection(collectionType).post(props2);
-    this.refreshIndex();
+    this.waitForQueueDrainAndRefreshIndex();
 
     // 2. Query from the center point to ensure that one is returned
     Collection collection = this.app().collection(collectionType).get(queryClose);
@@ -326,7 +327,7 @@ public class GeoPagingTest extends AbstractRestIT {
         .map("longitude", 150.952183));
     this.app().collection("users").post(props);
 
-    this.refreshIndex();
+    this.waitForQueueDrainAndRefreshIndex();
     // 2. Create a list of geo points
     List<double[]> points = new ArrayList<>();
     points.add(new double []{33.746369, -89});//Woodland, MS

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/applications/queries/MatrixQueryTests.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/queries/MatrixQueryTests.java b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/queries/MatrixQueryTests.java
index 7525d51..02a54da 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/queries/MatrixQueryTests.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/queries/MatrixQueryTests.java
@@ -21,7 +21,6 @@ import org.apache.usergrid.rest.test.resource.endpoints.CollectionEndpoint;
 import org.apache.usergrid.rest.test.resource.model.Collection;
 import org.apache.usergrid.rest.test.resource.model.Entity;
 import org.apache.usergrid.rest.test.resource.model.QueryParameters;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import java.util.UUID;
@@ -78,7 +77,7 @@ public class MatrixQueryTests extends AbstractRestIT {
         restaurant2 = this.app().collection("restaurants").post(restaurant2);
         restaurant3 = this.app().collection("restaurants").post(restaurant3);
         restaurant4 = this.app().collection("restaurants").post(restaurant4);
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         //3. Create "likes" connections between users and restaurants
         //user 1 likes old major
@@ -91,7 +90,7 @@ public class MatrixQueryTests extends AbstractRestIT {
 
         //user 3 likes  Lola (it shouldn't appear in the results)
         this.app().collection("users").entity(user3).connection("likes").collection("restaurants").entity(restaurant4).post();
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         //4. Retrieve "likes" connections per user and ensure the correct restaurants are returned
         Collection user1likes = this.app().collection("users").entity(user1).connection("likes").get();

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/applications/queries/OrderByTest.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/queries/OrderByTest.java b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/queries/OrderByTest.java
index a190526..6591713 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/queries/OrderByTest.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/queries/OrderByTest.java
@@ -246,7 +246,7 @@ public class OrderByTest extends QueryTestBase {
             }
         }
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
         //2. Query without 'order by'
         String query = "select * where created > " + created;
         QueryParameters params = new QueryParameters().setQuery(query);
@@ -289,7 +289,7 @@ public class OrderByTest extends QueryTestBase {
             this.app().collection("activity").post(props);
         }
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
         //2. Query a subset of the entities, specifying order and limit
         String query = "select * where created > " + 1 + " order by created desc";
         QueryParameters params = new QueryParameters().setQuery(query).setLimit(5);
@@ -334,7 +334,7 @@ public class OrderByTest extends QueryTestBase {
             logger.info(String.valueOf(Long.parseLong(activities[0].get("created").toString())));
         }
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex(750);
 
 
         ArrayUtils.reverse(activities);

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/applications/queries/QueryTestBase.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/queries/QueryTestBase.java b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/queries/QueryTestBase.java
index f7eb3fe..0adafef 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/queries/QueryTestBase.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/queries/QueryTestBase.java
@@ -65,7 +65,7 @@ public class QueryTestBase  extends AbstractRestIT {
             logger.info(entities[i].entrySet().toString());
         }
         //refresh the index so that they are immediately searchable
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         return entities;
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/applications/queries/SelectMappingsQueryTest.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/queries/SelectMappingsQueryTest.java b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/queries/SelectMappingsQueryTest.java
index 286c984..acf51c1 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/queries/SelectMappingsQueryTest.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/queries/SelectMappingsQueryTest.java
@@ -83,7 +83,7 @@ public class SelectMappingsQueryTest extends QueryTestBase {
             .withProp( "testProp", value )
             .withProp( "TESTPROP", otherValue);
         app().collection( collectionName ).post( entity );
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         // testProp and TESTPROP should now have otherValue
 
@@ -110,7 +110,7 @@ public class SelectMappingsQueryTest extends QueryTestBase {
         Entity entity = new Entity()
             .withProp( "testprop", value );
         app().collection( collectionName ).post( entity );
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         // testProp and TESTPROP should now have otherValue
 
@@ -130,7 +130,7 @@ public class SelectMappingsQueryTest extends QueryTestBase {
         Entity entity = new Entity()
             .withProp( "testprop", value );
         app().collection( collectionName ).post( entity );
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         // now query this without encoding the plus symbol
         QueryParameters params = new QueryParameters()
@@ -160,13 +160,13 @@ public class SelectMappingsQueryTest extends QueryTestBase {
         String value = RandomStringUtils.randomAlphabetic( 20 );
         Entity entity = new Entity().withProp( "testProp", value );
         app().collection( collectionName ).post( entity );
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         // override with TESTPROP=newValue
         String newValue = RandomStringUtils.randomAlphabetic( 20 );
         entity = new Entity().withProp( "TESTPROP", newValue );
         app().collection( collectionName ).post( entity );
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         // testProp and TESTPROP should new be queryable by new value
 
@@ -193,13 +193,13 @@ public class SelectMappingsQueryTest extends QueryTestBase {
         String value = RandomStringUtils.randomAlphabetic( 20 );
         Entity entity = new Entity().withProp( "TESTPROP", value );
         app().collection( collectionName ).post( entity );
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         // override with testProp=newValue
         String newValue = RandomStringUtils.randomAlphabetic( 20 );
         entity = new Entity().withProp( "testProp", newValue );
         app().collection( collectionName ).post( entity );
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         // testProp and TESTPROP should new be queryable by new value
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/management/AccessTokenIT.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/management/AccessTokenIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/management/AccessTokenIT.java
index 431d224..841ac1d 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/management/AccessTokenIT.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/management/AccessTokenIT.java
@@ -153,7 +153,7 @@ public class AccessTokenIT extends AbstractRestIT {
         assertNotNull( token.getAccessToken() );
         management().token().setToken( token );
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         assertNotNull( management().me().get( Token.class ) );
 
@@ -177,7 +177,7 @@ public class AccessTokenIT extends AbstractRestIT {
         assertNotNull( adminToken );
         assertNotNull( adminToken.getAccessToken() );
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         assertNotNull( management().me().get( Token.class ) );
 
@@ -237,7 +237,7 @@ public class AccessTokenIT extends AbstractRestIT {
         management().token().setToken( clientSetup.getSuperuserToken() );
         management().users().user( clientSetup.getUsername() ).revokeTokens().post(true , ApiResponse.class, null,null);
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
 
         //test that token 1 doesn't work
@@ -278,7 +278,7 @@ public class AccessTokenIT extends AbstractRestIT {
         management().token().setToken( clientSetup.getSuperuserToken() );
         management().users().user( clientSetup.getUsername() ).revokeToken().post( false, ApiResponse.class,null,queryParameters );
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
 
         //test that token 1 doesn't work

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/management/AdminUsersIT.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/management/AdminUsersIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/management/AdminUsersIT.java
index f80f131..829f561 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/management/AdminUsersIT.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/management/AdminUsersIT.java
@@ -17,7 +17,6 @@
 
 package org.apache.usergrid.rest.management;
 
-import com.sun.jersey.api.client.UniformInterfaceException;
 import net.jcip.annotations.NotThreadSafe;
 import org.apache.commons.lang.RandomStringUtils;
 import org.apache.usergrid.management.MockImapClient;
@@ -77,7 +76,7 @@ public class AdminUsersIT extends AbstractRestIT {
         // change the password as admin. The old password isn't required
         management.users().user( username ).password().post(Entity.class,passwordPayload);
 
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         //Get the token using the new password
         Token adminToken = management.token().post( false, Token.class, new Token( username, "testPassword" ) ,null );
@@ -159,7 +158,7 @@ public class AdminUsersIT extends AbstractRestIT {
         // change the password as admin. The old password isn't required
         management.users().user( username ).password().post(Entity.class, passwordPayload );
 
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
 
         //Get the token using the new password
@@ -197,7 +196,7 @@ public class AdminUsersIT extends AbstractRestIT {
         management.token().setToken( clientSetup.getSuperuserToken());
         management.users().user( username ).password().post( passwordPayload );
 
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         assertNotNull( management.token().post( false, Token.class, new Token(username, "testPassword"), null ));
 
@@ -260,7 +259,7 @@ public class AdminUsersIT extends AbstractRestIT {
             //Send rest call to the /testProperties endpoint to persist property changes
             clientSetup.getRestClient().testPropertiesResource().post( testPropertiesPayload );
 
-            refreshIndex();
+            waitForQueueDrainAndRefreshIndex();
 
             //Create organization for the admin user to be confirmed
             Organization organization = createOrgPayload( "testUnconfirmedAdminLogin", null );
@@ -342,7 +341,7 @@ public class AdminUsersIT extends AbstractRestIT {
 
             //Send rest call to the /testProperties endpoint to persist property changes
             clientSetup.getRestClient().testPropertiesResource().post( testPropertiesPayload );
-            refreshIndex();
+            waitForQueueDrainAndRefreshIndex();
 
             Token superuserToken = management.token().post( Token.class,
                 new Token( clientSetup.getSuperuserName(), clientSetup.getSuperuserPassword() )  );
@@ -379,7 +378,7 @@ public class AdminUsersIT extends AbstractRestIT {
 
             //Send rest call to the /testProperties endpoint to persist property changes
             clientSetup.getRestClient().testPropertiesResource().post( testPropertiesPayload );
-            refreshIndex();
+            waitForQueueDrainAndRefreshIndex();
 
             Token testToken = management().token().post(Token.class,
                 new Token( originalTestProperties.getAsString( PROPERTIES_TEST_ACCOUNT_ADMIN_USER_EMAIL ),
@@ -586,7 +585,7 @@ public class AdminUsersIT extends AbstractRestIT {
     public void reactivateTest() throws Exception {
         //call reactivate endpoint on default user
         clientSetup.getRestClient().management().users().user( clientSetup.getUsername() ).reactivate().get();
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         //Create mocked inbox and check to see if you recieved an email in the users inbox.
         List<Message> inbox = Mailbox.get( clientSetup.getEmail());
@@ -599,7 +598,7 @@ public class AdminUsersIT extends AbstractRestIT {
 
         // initiate password reset
         management().users().user( clientSetup.getUsername() ).resetpw().post(new Form());
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         // create mocked inbox, get password reset email and extract token
         List<Message> inbox = Mailbox.get( clientSetup.getEmail() );
@@ -630,7 +629,7 @@ public class AdminUsersIT extends AbstractRestIT {
 
         assertTrue( html.contains( "password set" ) );
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
 
         html = management().users().user( clientSetup.getUsername() ).resetpw().post( formData );
@@ -644,7 +643,7 @@ public class AdminUsersIT extends AbstractRestIT {
 
         // initiate password reset
         management().users().user( clientSetup.getUsername() ).resetpw().post(new Form());
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         // create mocked inbox, get password reset email and extract token
         List<Message> inbox = Mailbox.get( clientSetup.getEmail() );
@@ -725,7 +724,7 @@ public class AdminUsersIT extends AbstractRestIT {
         payload.put( "newpassword", passwords[1] );
         management().users().user( clientSetup.getUsername() ).password().post( Entity.class,payload );
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         payload.put( "newpassword", passwords[0] );
         payload.put( "oldpassword", passwords[1] );
@@ -747,7 +746,7 @@ public class AdminUsersIT extends AbstractRestIT {
         // request password reset
 
         management().users().user( clientSetup.getUsername() ).resetpw().post(new Form());
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         // get resetpw token from email
 
@@ -774,7 +773,7 @@ public class AdminUsersIT extends AbstractRestIT {
         String html = management().users().user( clientSetup.getUsername() ).resetpw().getTarget().request()
             .post( javax.ws.rs.client.Entity.form(formData), String.class );
         assertTrue( html.contains( "password set" ) );
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         // login with new password and get token
 
@@ -797,7 +796,7 @@ public class AdminUsersIT extends AbstractRestIT {
             put("newpassword", "test");
         }};
         management().users().user( clientSetup.getUsername() ).password().post( false, payload, null );
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         // get password and check password change time again
 
@@ -851,7 +850,7 @@ public class AdminUsersIT extends AbstractRestIT {
         //Create admin user
         management().orgs().org( clientSetup.getOrganizationName() ).users().post(ApiResponse.class ,adminUserPayload );
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         //Retrieves the admin users
         ApiResponse adminUsers = management().orgs().org( clientSetup.getOrganizationName() ).users().get(ApiResponse.class);
@@ -905,7 +904,7 @@ public class AdminUsersIT extends AbstractRestIT {
             //Send rest call to the /testProperties endpoint to persist property changes
             clientSetup.getRestClient().testPropertiesResource().post( testPropertiesPayload );
 
-            refreshIndex();
+            waitForQueueDrainAndRefreshIndex();
 
             //Retrieve properties and ensure that they are set correctly.
             ApiResponse apiResponse = clientSetup.getRestClient().testPropertiesResource().get();

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/management/ExportResourceIT.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/management/ExportResourceIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/management/ExportResourceIT.java
index 81ff2d3..1b649d2 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/management/ExportResourceIT.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/management/ExportResourceIT.java
@@ -141,7 +141,7 @@ public class ExportResourceIT extends AbstractRestIT {
         assertNotNull( uuid );
 
         exportEntity = null;
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
         try {
 
             exportEntity = management().orgs().org( clientSetup.getOrganizationName() )

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/management/ImportResourceIT.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/management/ImportResourceIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/management/ImportResourceIT.java
index c390393..9c35a6c 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/management/ImportResourceIT.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/management/ImportResourceIT.java
@@ -193,7 +193,7 @@ public class ImportResourceIT extends AbstractRestIT {
         Organization orgPayload = new Organization(
             newOrgName, newOrgUsername, newOrgEmail, newOrgName, newOrgPassword, null);
         Organization orgCreatedResponse = clientSetup.getRestClient().management().orgs().post(orgPayload);
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
         assertNotNull(orgCreatedResponse);
 
 
@@ -391,8 +391,8 @@ public class ImportResourceIT extends AbstractRestIT {
 //            for ( org.apache.usergrid.persistence.Entity importedThing : importedThings ) {
 //                emApp1.delete( importedThing );
 //            }
-//            emApp1.refreshIndex();
-//            emApp2.refreshIndex();
+//            emApp1.waitForQueueDrainAndRefreshIndex();
+//            emApp2.waitForQueueDrainAndRefreshIndex();
 //
 //            importedThings = emApp2.getCollection(
 //                appId2, "things", null, Query.Level.ALL_PROPERTIES).getEntities();
@@ -438,7 +438,7 @@ public class ImportResourceIT extends AbstractRestIT {
             .addToPath(importEntity.getUuid().toString())
             .get();
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         Entity importGetIncludes = this.management().orgs().org( org ).app()
             .addToPath(app)
@@ -671,7 +671,7 @@ public class ImportResourceIT extends AbstractRestIT {
             Thread.sleep(1000);
         }
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         return importEntity;
     }
@@ -697,7 +697,7 @@ public class ImportResourceIT extends AbstractRestIT {
 
         }
 
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
 //        // first two things are related to each other
 //        em.createConnection(new SimpleEntityRef(type, created.get(0).getUuid()),
@@ -705,7 +705,7 @@ public class ImportResourceIT extends AbstractRestIT {
 //        em.createConnection(new SimpleEntityRef(type, created.get(1).getUuid()),
 //            "related", new SimpleEntityRef(type, created.get(0).getUuid()));
 //
-//        em.refreshIndex();
+//        em.waitForQueueDrainAndRefreshIndex();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/management/ManagementResourceIT.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/management/ManagementResourceIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/management/ManagementResourceIT.java
index 635368e..0a80d73 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/management/ManagementResourceIT.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/management/ManagementResourceIT.java
@@ -213,7 +213,7 @@ public class ManagementResourceIT extends AbstractRestIT {
             users1.add( "follower" + Integer.toString( i ) );
         }
 
-        refreshIndex(  );
+        waitForQueueDrainAndRefreshIndex(  );
 
         checkFeed( "leader1", users1 );
         //try with 11
@@ -230,20 +230,20 @@ public class ManagementResourceIT extends AbstractRestIT {
 
         //create user
         createUser( leader );
-        refreshIndex(   );
+        waitForQueueDrainAndRefreshIndex(   );
 
         String preFollowContent = leader + ": pre-something to look for " + UUID.randomUUID().toString();
 
         addActivity( leader, leader + " " + leader + "son", preFollowContent );
-        refreshIndex(  );
+        waitForQueueDrainAndRefreshIndex(  );
 
         String lastUser = followers.get( followers.size() - 1 );
         int i = 0;
         for ( String user : followers ) {
             createUser( user );
-            refreshIndex( );
+            waitForQueueDrainAndRefreshIndex( );
             follow( user, leader );
-            refreshIndex(  );
+            waitForQueueDrainAndRefreshIndex(  );
         }
         userFeed = getUserFeed( lastUser );
         assertTrue( userFeed.size() == 1 );
@@ -254,7 +254,7 @@ public class ManagementResourceIT extends AbstractRestIT {
         String postFollowContent = leader + ": something to look for " + UUID.randomUUID().toString();
         addActivity( leader, leader + " " + leader + "son", postFollowContent );
 
-        refreshIndex(  );
+        waitForQueueDrainAndRefreshIndex(  );
 
         //check feed
         userFeed = getUserFeed( lastUser );
@@ -321,7 +321,7 @@ public class ManagementResourceIT extends AbstractRestIT {
             .post( new Application( "mgmt-org-app" ) );
 
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         Entity appdata = apiResponse.getEntities().get(0);
         assertEquals((clientSetup.getOrganizationName() + "/mgmt-org-app")
@@ -336,7 +336,7 @@ public class ManagementResourceIT extends AbstractRestIT {
         assertEquals("Roles", roles.get("title").toString());
         assertEquals(4, roles.size());
 
-        refreshIndex(   );
+        waitForQueueDrainAndRefreshIndex(   );
 
         // GET /applications/mgmt-org-app
 
@@ -361,7 +361,7 @@ public class ManagementResourceIT extends AbstractRestIT {
     public void checkSizes() throws Exception {
         final String appname = clientSetup.getAppName();
         this.app().collection("testCollection").post(new Entity().chainPut("name","test"));
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
         Entity size = management().orgs().org( clientSetup.getOrganizationName() ).app().addToPath(appname).addToPath("_size").get();
         Entity rolesSize = management().orgs().org(clientSetup.getOrganizationName()).app().addToPath(appname).addToPath("roles/_size").get();
         Entity collectionsSize = management().orgs().org(clientSetup.getOrganizationName()).app().addToPath(appname).addToPath("collections/_size").get();

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/management/OrganizationsIT.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/management/OrganizationsIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/management/OrganizationsIT.java
index 2bbdaaf..ad204ae 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/management/OrganizationsIT.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/management/OrganizationsIT.java
@@ -68,7 +68,7 @@ public class OrganizationsIT extends AbstractRestIT {
         assertNotNull( organizationResponse );
 
 //        Thread.sleep( 1000 );
-//        this.refreshIndex();
+//        this.waitForQueueDrainAndRefreshIndex();
 
         //Creates token
         Token token =
@@ -78,7 +78,7 @@ public class OrganizationsIT extends AbstractRestIT {
 
         assertNotNull( token );
 
-        //this.refreshIndex();
+        //this.waitForQueueDrainAndRefreshIndex();
 
         //Assert that the get returns the correct org and owner.
         Organization returnedOrg = clientSetup.getRestClient().management().orgs().org( organization.getOrganization() ).get();
@@ -136,7 +136,7 @@ public class OrganizationsIT extends AbstractRestIT {
         // Create organization
         Organization organization = createOrgPayload( "testCreateDuplicateOrgName", null );
         Organization orgCreatedResponse = clientSetup.getRestClient().management().orgs().post( organization );
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         assertNotNull( orgCreatedResponse );
 
@@ -193,7 +193,7 @@ public class OrganizationsIT extends AbstractRestIT {
         //create the org/owner
         Organization orgCreatedResponse = clientSetup.getRestClient().management().orgs().post( organization );
 
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         assertNotNull( orgCreatedResponse );
 
@@ -394,7 +394,7 @@ public class OrganizationsIT extends AbstractRestIT {
         //update the organization.
         management().orgs().org( clientSetup.getOrganizationName() ).put(orgPayload);
 
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         //retrieve the organization
         Organization orgResponse = management().orgs().org( clientSetup.getOrganizationName() ).get();
@@ -408,7 +408,7 @@ public class OrganizationsIT extends AbstractRestIT {
         //update the organization.
         management().orgs().org( clientSetup.getOrganizationName() ).put(orgPayload);
 
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         orgResponse = management().orgs().org( clientSetup.getOrganizationName() ).get();
 
@@ -433,7 +433,7 @@ public class OrganizationsIT extends AbstractRestIT {
         Organization orgResponse = management().orgs().org( clientSetup.getOrganizationName() ).get();
 
 
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
         //attempt to post duplicate connection
         Entity userPostResponse = management().orgs().org( clientSetup.getOrganizationName() ).users().user( clientSetup.getEmail() ).put( entity );
 
@@ -461,7 +461,7 @@ public class OrganizationsIT extends AbstractRestIT {
         Organization orgResponse = management().orgs().org( clientSetup.getOrganizationName() ).get();
 
 
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
         //attempt to post duplicate connection
         try {
             Entity userPostResponse = management().orgs().org( clientSetup.getOrganizationName() ).users().post( Entity.class, entity );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/management/RegistrationIT.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/management/RegistrationIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/management/RegistrationIT.java
index 8404632..90a6919 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/management/RegistrationIT.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/management/RegistrationIT.java
@@ -170,7 +170,7 @@ public class RegistrationIT extends AbstractRestIT {
                 "changeme");
             UUID userId = node.getUuid();
 
-            refreshIndex();
+            waitForQueueDrainAndRefreshIndex();
 
             String subject = "Password Reset";
 
@@ -239,7 +239,7 @@ public class RegistrationIT extends AbstractRestIT {
 
             //Disgusting data manipulation to parse the form response.
             Map adminUserPostResponse = (management().users().post( User.class, userForm ));
-            refreshIndex();
+            waitForQueueDrainAndRefreshIndex();
 
             Map adminDataMap = ( Map ) adminUserPostResponse.get( "data" );
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/test/resource/AbstractRestIT.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/test/resource/AbstractRestIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/test/resource/AbstractRestIT.java
index 6e5e4f9..4799b0c 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/test/resource/AbstractRestIT.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/test/resource/AbstractRestIT.java
@@ -173,10 +173,12 @@ public class AbstractRestIT extends JerseyTest {
         return this.app().token().post( new Token( username, password ) );
     }
 
-    public void refreshIndex() {
-        //TODO see how we can refresh index (not async) for tests so sleep may not be needed
+    public void waitForQueueDrainAndRefreshIndex(int waitTimeMillis) {
+        // indexing is async, tests will need to wait for stuff to be processed.
+        // this sleep is slightly longer becasue distributed queueing on top of Cassandra can be used without and in-mem
+        // copy.  see Qakka in the persistence module
         try {
-            Thread.sleep(250); //ensure index docs are finished being sent to Elasticsearch by Usergrid before refresh
+            Thread.sleep(waitTimeMillis);
             clientSetup.refreshIndex();
         } catch (InterruptedException e) {
             System.out.println("Error refreshing index");
@@ -184,6 +186,10 @@ public class AbstractRestIT extends JerseyTest {
         }
     }
 
+    public void waitForQueueDrainAndRefreshIndex() {
+        waitForQueueDrainAndRefreshIndex(750);
+    }
+
 
     /**
      * Takes in the expectedStatus message and the expectedErrorMessage then compares it to the ClientErrorException

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/resources/usergrid-custom-test.properties
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/resources/usergrid-custom-test.properties b/stack/rest/src/test/resources/usergrid-custom-test.properties
index 98c5640..615bedd 100644
--- a/stack/rest/src/test/resources/usergrid-custom-test.properties
+++ b/stack/rest/src/test/resources/usergrid-custom-test.properties
@@ -72,6 +72,11 @@ elasticsearch.queue_impl=DISTRIBUTED
 # Queueing Test Settings
 # Reduce the long polling time for the tests
 queue.long.polling.time.millis=50
-queue.num.actors=50
+queue.num.actors=100
 queue.sender.num.actors=100
 queue.writer.num.actors=100
+elasticsearch.worker_count=12
+elasticsearch.worker_count_utility=4
+queue.get.timeout.seconds=10
+queue.send.timeout.seconds=10
+usergrid.push.worker_count=8

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
index 7d02360..22d7344 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
@@ -20,6 +20,7 @@ import com.codahale.metrics.*;
 import com.codahale.metrics.Timer;
 import com.google.inject.Injector;
 
+import org.apache.commons.lang.RandomStringUtils;
 import org.apache.usergrid.persistence.EntityManagerFactory;
 
 import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
@@ -148,7 +149,7 @@ public class QueueListener  {
             Thread.currentThread().setDaemon(true);
         }
 
-        Thread.currentThread().setName(getClass().getSimpleName()+"_PushNotifications-"+threadNumber);
+        Thread.currentThread().setName(getClass().getSimpleName()+"_Push-"+ RandomStringUtils.randomAlphanumeric(4)+"-"+threadNumber);
 
         final AtomicInteger consecutiveExceptions = new AtomicInteger();
 
@@ -268,12 +269,16 @@ public class QueueListener  {
                                 if (logger.isTraceEnabled()) {
                                     logger.trace("no messages...sleep...{}", sleepWhenNoneFound);
                                 }
-                                Thread.sleep(sleepWhenNoneFound);
+                                try {
+                                    Thread.sleep(sleepWhenNoneFound);
+                                } catch (InterruptedException e){
+                                    // noop
+                                }
                             }
                             timerContext.stop();
                             //send to the providers
                             consecutiveExceptions.set(0);
-                        }catch (Exception ex){
+                        } catch (Exception ex){
                             logger.error("failed to dequeue",ex);
 
                             // clear the queue name cache b/c tests might have wiped the keyspace
@@ -286,7 +291,7 @@ public class QueueListener  {
                                 Thread.sleep(sleeptime);
                             }catch (InterruptedException ie){
                                 if (logger.isTraceEnabled()) {
-                                    logger.info("sleep interrupted");
+                                    logger.trace("sleep interrupted");
                                 }
                             }
                         }
@@ -306,7 +311,7 @@ public class QueueListener  {
             return;
         }
         for(Future future : futures){
-            future.cancel(true);
+            future.cancel(false);
         }
 
         pool.shutdownNow();

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/services/src/main/java/org/apache/usergrid/services/notifications/gcm/GCMAdapter.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/gcm/GCMAdapter.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/gcm/GCMAdapter.java
index 6b619b7..44b0139 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/gcm/GCMAdapter.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/gcm/GCMAdapter.java
@@ -16,7 +16,6 @@
  */
 package org.apache.usergrid.services.notifications.gcm;
 
-import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.android.gcm.server.*;
 import org.apache.usergrid.persistence.entities.Notification;
@@ -99,7 +98,7 @@ public class GCMAdapter implements ProviderAdapter {
         if(!map.containsKey(priorityKey) && notification.getPriority() != null){
             map.put(priorityKey, notification.getPriority());
         }
-        Batch batch = getBatch( map);
+        Batch batch = getBatch( map );
         batch.add(providerId, tracker);
     }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/services/src/test/java/org/apache/usergrid/ServiceApplication.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/ServiceApplication.java b/stack/services/src/test/java/org/apache/usergrid/ServiceApplication.java
index 89ff272..74c5c92 100644
--- a/stack/services/src/test/java/org/apache/usergrid/ServiceApplication.java
+++ b/stack/services/src/test/java/org/apache/usergrid/ServiceApplication.java
@@ -73,7 +73,7 @@ public class ServiceApplication extends CoreApplication {
         ServiceResults testRequest = testRequest( action, expectedCount, true, params );
 
         if ( !action.equals( ServiceAction.GET )) {
-            this.refreshIndex();
+            this.waitForQueueDrainAndRefreshIndex();
         }
 
         return testRequest;

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/services/src/test/java/org/apache/usergrid/management/EmailFlowIT.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/management/EmailFlowIT.java b/stack/services/src/test/java/org/apache/usergrid/management/EmailFlowIT.java
index a46bd60..60a5ba0 100644
--- a/stack/services/src/test/java/org/apache/usergrid/management/EmailFlowIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/management/EmailFlowIT.java
@@ -235,7 +235,7 @@ public class EmailFlowIT {
         assertNotNull( orgOwner );
 
         ApplicationInfo app = setup.getMgmtSvc().createApplication( orgOwner.getOrganization().getUuid(), appName );
-        this.app.refreshIndex();
+        this.app.waitForQueueDrainAndRefreshIndex();
 
         //turn on app admin approval for app users
         enableAdminApproval(app.getId());

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/services/src/test/java/org/apache/usergrid/management/RoleIT.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/management/RoleIT.java b/stack/services/src/test/java/org/apache/usergrid/management/RoleIT.java
index 075ee03..20d12ab 100644
--- a/stack/services/src/test/java/org/apache/usergrid/management/RoleIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/management/RoleIT.java
@@ -62,7 +62,7 @@ public class RoleIT {
 
         UUID applicationId = setup.getMgmtSvc().createApplication( organization.getUuid(), "test-app" ).getId();
         EntityManager em = setup.getEmf().getEntityManager( applicationId );
-        setup.getEntityIndex().refresh(em.getApplicationId());
+        setup.getEntityIndex().waitForQueueDrainAndRefresh(em.getApplicationId(), 500);
 
         Map<String, Object> properties = new LinkedHashMap<String, Object>();
         properties.put( "username", "edanuff5" );
@@ -71,8 +71,7 @@ public class RoleIT {
         User user = em.create( User.ENTITY_TYPE, User.class, properties );
 
         em.createRole( "logged-in", "Logged In", 2000 );
-        setup.getEntityIndex().refresh(em.getApplicationId());
-        setup.getEntityIndex().refresh(em.getApplicationId());
+        setup.getEntityIndex().waitForQueueDrainAndRefresh(em.getApplicationId(), 500);
         em.addUserToRole( user.getUuid(), "logged-in" );
 
         String accessToken = setup.getMgmtSvc().getAccessTokenForAppUser( applicationId, user.getUuid(), 0 );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/services/src/test/java/org/apache/usergrid/services/CollectionServiceIT.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/services/CollectionServiceIT.java b/stack/services/src/test/java/org/apache/usergrid/services/CollectionServiceIT.java
index c071d1f..62818c2 100644
--- a/stack/services/src/test/java/org/apache/usergrid/services/CollectionServiceIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/services/CollectionServiceIT.java
@@ -232,7 +232,7 @@ public class CollectionServiceIT extends AbstractServiceIT {
             // ok
         }
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
         try {
             // try DELETE on cats with dogs name
             app.testRequest( ServiceAction.DELETE, 0, "cats", "Danny" );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/services/src/test/java/org/apache/usergrid/services/GroupServiceIT.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/services/GroupServiceIT.java b/stack/services/src/test/java/org/apache/usergrid/services/GroupServiceIT.java
index d3c2436..d37bb10 100644
--- a/stack/services/src/test/java/org/apache/usergrid/services/GroupServiceIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/services/GroupServiceIT.java
@@ -68,13 +68,14 @@ public class GroupServiceIT extends AbstractServiceIT {
         app.createGroupRole( group.getUuid(), "admin", 0 );
         app.createGroupRole( group.getUuid(), "author", 0 );
 
-        setup.getEntityIndex().refresh(app.getId());
+        app.waitForQueueDrainAndRefreshIndex(500);
 
 
         app.grantGroupRolePermission( group.getUuid(), "admin", "users:access:*" );
         app.grantGroupRolePermission( group.getUuid(), "admin", "groups:access:*" );
         app.grantGroupRolePermission( group.getUuid(), "author", "assets:access:*" );
-        setup.getEntityIndex().refresh(app.getId());
+
+        app.waitForQueueDrainAndRefreshIndex(500);
 
         app.testDataRequest( ServiceAction.GET, "groups", group.getUuid(), "rolenames" );
         app.testDataRequest( ServiceAction.GET, "groups", group.getUuid(), "roles", "admin", "permissions" );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/services/src/test/java/org/apache/usergrid/services/ServiceInvocationIT.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/services/ServiceInvocationIT.java b/stack/services/src/test/java/org/apache/usergrid/services/ServiceInvocationIT.java
index 8c2be2c..81dced1 100644
--- a/stack/services/src/test/java/org/apache/usergrid/services/ServiceInvocationIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/services/ServiceInvocationIT.java
@@ -82,6 +82,8 @@ public class ServiceInvocationIT extends AbstractServiceIT {
 
         app.testRequest( ServiceAction.POST, 1, null, "users", "edanuff", "likes", cat.getUuid() );
 
+        app.waitForQueueDrainAndRefreshIndex(250);
+
         Entity restaurant = app.doCreate( "restaurant", "Brickhouse" );
 
         app.createConnection( user, "likes", restaurant );
@@ -92,6 +94,8 @@ public class ServiceInvocationIT extends AbstractServiceIT {
 
         app.testRequest( ServiceAction.POST, 1, "users", user.getUuid(), "connections", "likes", restaurant.getUuid() );
 
+        app.waitForQueueDrainAndRefreshIndex(250);
+
         app.testRequest( ServiceAction.GET, 1, "users", "edanuff", "likes", "cats" );
 
         app.testRequest( ServiceAction.GET, 3, "users", "edanuff", "likes" );
@@ -104,7 +108,7 @@ public class ServiceInvocationIT extends AbstractServiceIT {
         app.testRequest( ServiceAction.GET, 1, "users", "edanuff", "likes",
                 Query.fromQL( "select * where name='axis*'" ) );
 
-//        TODO, we don't allow this at the RESt level, why is this a test?
+//        TODO, we don't allow this at the REST level, why is this a test?
 //        app.testRequest( ServiceAction.GET, 3, null, "users", "edanuff", "connections" );
 
         app.put( "color", "blacknwhite" );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/services/src/test/java/org/apache/usergrid/services/notifications/AbstractServiceNotificationIT.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/services/notifications/AbstractServiceNotificationIT.java b/stack/services/src/test/java/org/apache/usergrid/services/notifications/AbstractServiceNotificationIT.java
index 5ea815f..c035192 100644
--- a/stack/services/src/test/java/org/apache/usergrid/services/notifications/AbstractServiceNotificationIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/services/notifications/AbstractServiceNotificationIT.java
@@ -28,7 +28,6 @@ import org.apache.usergrid.persistence.SimpleEntityRef;
 import org.apache.usergrid.persistence.entities.Notification;
 import org.apache.usergrid.persistence.entities.Receipt;
 import org.apache.usergrid.services.AbstractServiceIT;
-import org.apache.usergrid.services.notifications.gcm.NotificationsServiceIT;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -52,8 +51,7 @@ public abstract class AbstractServiceNotificationIT extends AbstractServiceIT {
             throws Exception {
         long timeout = System.currentTimeMillis() + 60000;
         while (System.currentTimeMillis() < timeout) {
-            Thread.sleep(200);
-            app.refreshIndex();
+            app.waitForQueueDrainAndRefreshIndex(200);
             notification = app.getEntityManager().get(notification.getUuid(), Notification.class);
             if (notification.getFinished() != null) {
                 return notification;
@@ -95,10 +93,9 @@ public abstract class AbstractServiceNotificationIT extends AbstractServiceIT {
             }
         }
 
-        //assertEquals(expected, receipts.size());
-        if( expected != receipts.size()){
-            logger.warn("Expected receipt count {} does not match actual count {}", expected, receipts.size());
-        }
+
+        assertEquals(expected, receipts.size());
+
 
         for (EntityRef receipt : receipts) {
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java b/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
index 2a757ca..1c3bbcd 100644
--- a/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
@@ -117,7 +117,6 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
                 setup.getEntityIndex().refresh(app.getId());
 
         listener = new QueueListener(ns.getServiceManagerFactory(),ns.getEntityManagerFactory(), new Properties());
-        listener.DEFAULT_SLEEP = 200;
         listener.start();
     }
 
@@ -234,24 +233,24 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
         );
 
 
-        // verify Query for CREATED state
+        notificationWaitForComplete(notification);
+        app.waitForQueueDrainAndRefreshIndex(250);
+
+        // verify Query for FINISHED state and that the devices processed is 0
         Query query =  Query.fromEquals( "state", Notification.State.FINISHED.toString() );
         Results results = app.getEntityManager().searchCollection(app.getEntityManager().getApplicationRef(), "notifications", query);
-        Entity entity = results.getEntitiesMap().get(notification.getUuid());
-        assertNotNull(entity);
-
-        notificationWaitForComplete(notification);
+        notification = (Notification)results.getEntitiesMap().get(notification.getUuid()).toTypedEntity();
+        assertEquals(0, notification.getDeviceProcessedCount());
 
         // perform push //
 
-        //ns.getQueueManager().processBatchAndReschedule(notification, null);
         notification = app.getEntityManager().get(e.getUuid(), Notification.class);
 
         // verify Query for FINISHED state
         query = Query.fromEquals("state", Notification.State.FINISHED.toString());
         results = app.getEntityManager().searchCollection(app.getEntityManager().getApplicationRef(),
                 "notifications", query);
-        entity = results.getEntitiesMap().get(notification.getUuid());
+        Entity entity = results.getEntitiesMap().get(notification.getUuid());
         assertNotNull(entity);
 
         notification = (Notification) entity.toTypedEntity();
@@ -501,7 +500,7 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
     @Test
     public void oneDeviceTwoNotifiers() throws Exception {
 
-        // This test should configure 2 notifiers on a device and ensure that we can send to one of them
+        // This test should configure 2 notifiers on device1 and ensure that we can send to one of them
 
         // create a 2nd notifier //
         Object notifierName1 = "apNs2";
@@ -525,17 +524,19 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
         assertEquals(notifier2.getProvider(), PROVIDER);
         assertEquals(notifier2.getEnvironment(), environment1);
 
+
+        // Add a device token for the 2nd notifier
+        app.clear();
         String key2 = notifier2.getName() + NOTIFIER_ID_POSTFIX;
-        device1.setProperty(key2, PUSH_TOKEN);
-        app.getEntityManager().update(device1);
-        setup.getEntityIndex().refresh(app.getId()); // need to refresh the index after an update
+        app.put(key2, PUSH_TOKEN);
+        app.testRequest(ServiceAction.PUT, 1, "devices", device1).getEntity();
 
 
         // create push notification //
         app.clear();
         String payload = getPayload();
         Map<String, String> payloads = new HashMap<String, String>(1);
-        payloads.put(notifier.getUuid().toString(), payload);
+        payloads.put(notifierName, payload);
         app.put("payloads", payloads);
         app.put("debug",true);
 
@@ -543,14 +544,14 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
             "notifications").getEntity();
         app.testRequest(ServiceAction.GET, 1, "notifications", notificationEntity.getUuid());
 
-        Notification notification = app.getEntityManager().get(notificationEntity.getUuid(),
-                Notification.class);
-        assertEquals(
-                notification.getPayloads().get(notifier.getUuid().toString()),
-                payload);
+        Notification notification = app.getEntityManager().get(notificationEntity.getUuid(), Notification.class);
+        assertEquals(payload, notification.getPayloads().get(notifierName));
 
         // perform push //
         notification = notificationWaitForComplete(notification);
+
+        app.waitForQueueDrainAndRefreshIndex(2500);
+
         checkReceipts(notification, 1);
     }
 
@@ -692,6 +693,9 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
 
         // perform push //
         notification = notificationWaitForComplete(notification);
+
+        app.waitForQueueDrainAndRefreshIndex(250);
+
         checkReceipts(notification, 2);
 
         // Statistics are not accurate.  See - https://issues.apache.org/jira/browse/USERGRID-1207

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java b/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java
index 1a9f4f7..8360009 100644
--- a/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java
@@ -62,13 +62,6 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
     private NotificationsService ns;
     private QueueListener listener;
 
-
-    @BeforeClass
-    public static void setup() {
-
-
-    }
-
     @Before
     public void before() throws Exception {
 
@@ -103,7 +96,6 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
         ns = getNotificationService();
 
         listener = new QueueListener(ns.getServiceManagerFactory(), ns.getEntityManagerFactory(), new Properties());
-        listener.DEFAULT_SLEEP = 200;
         listener.start();
     }
 
@@ -569,6 +561,9 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
         // wait for notification to be marked finished
         notification = notificationWaitForComplete(notification);
 
+        // receipts are created and queried, wait a bit longer for this to happen as indexing
+        app.waitForQueueDrainAndRefreshIndex(500);
+
         // get the receipts entity IDs
         List<EntityRef> receipts = getNotificationReceipts(notification);
         assertEquals(1, receipts.size());
@@ -635,6 +630,10 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
         notification = notificationWaitForComplete(notification);
         app.testRequest(ServiceAction.GET, 1, "notifications", e.getUuid());
 
+
+        // receipts are created and queried, wait a bit longer for this to happen as indexing
+        app.waitForQueueDrainAndRefreshIndex(500);
+
         // get the receipts entity IDs
         List<EntityRef> receipts = getNotificationReceipts(notification);
         assertEquals(1, receipts.size());

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/services/src/test/resources/usergrid-custom-test.properties
----------------------------------------------------------------------
diff --git a/stack/services/src/test/resources/usergrid-custom-test.properties b/stack/services/src/test/resources/usergrid-custom-test.properties
index 49f8b5d..bcc8b8e 100644
--- a/stack/services/src/test/resources/usergrid-custom-test.properties
+++ b/stack/services/src/test/resources/usergrid-custom-test.properties
@@ -38,13 +38,14 @@ elasticsearch.queue_impl.resolution=true
 elasticsearch.queue_impl=DISTRIBUTED
 
 # Queueing Test Settings
-# Reduce the long polling time for the tests
-queue.long.polling.time.millis=150
-queue.num.actors=5
-queue.sender.num.actors=5
-queue.writer.num.actors=5
-elasticsearch.worker_count=2
-usergrid.push.worker_count=2
+queue.long.polling.time.millis=50
+queue.num.actors=50
+queue.sender.num.actors=50
+queue.writer.num.actors=50
+queue.get.timeout.seconds=10
+elasticsearch.worker_count=8
+elasticsearch.worker_count_utility=8
+usergrid.push.worker_count=8
 
 
 # This property is required to be set and cannot be defaulted anywhere


[3/3] usergrid git commit: Switch DISTRIBUTED database queueing to default not cache in memory as the in memory impl causes duplicate messgae processing quite often at the moment.

Posted by mr...@apache.org.
Switch DISTRIBUTED database queueing to default not cache in memory as the in memory impl causes duplicate messgae processing quite often at the moment.

 - includes making all the tests work without in-memory queue fronting the database queue which really means adding some more delay in tests
 - the tests now are actually faster now because the original refreshIndex() created and queried random entities which took longer in most cases
 - uncommented the checkReceipts function in Notification tests as these are now passing, with an added fix for duplicate receipt creation
 - some logging updates in the distributed database queueing impl (Qakka)
 - increased the default take to 500 from the queue when DISTRIBUTED database queueing is configured ( which is the default now )
 - Notifications Queue Listener thread names have a random identifier in included
 - reduced the DISTRIBUTED database queueing default long poll to 1 second from 5 seconds


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/d3e988bc
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/d3e988bc
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/d3e988bc

Branch: refs/heads/master
Commit: d3e988bcbb7eb417c84cfda7396ec3506521aa37
Parents: 8b63aae
Author: Michael Russo <ru...@google.com>
Authored: Sun Apr 2 16:14:05 2017 -0700
Committer: Michael Russo <ru...@google.com>
Committed: Sun Apr 2 16:14:05 2017 -0700

----------------------------------------------------------------------
 .../corepersistence/CpEntityManager.java        | 44 +---------
 .../asyncevents/AsyncIndexProvider.java         |  5 ++
 .../java/org/apache/usergrid/Application.java   |  4 +-
 .../org/apache/usergrid/CoreApplication.java    | 22 ++---
 .../org/apache/usergrid/CoreITSetupImpl.java    | 20 ++++-
 .../org/apache/usergrid/TestEntityIndex.java    |  1 +
 .../corepersistence/AggregationServiceTest.java | 15 ++--
 .../corepersistence/StaleIndexCleanupTest.java  | 17 ++--
 .../persistence/ApplicationServiceIT.java       |  6 +-
 .../usergrid/persistence/CollectionIT.java      | 74 ++++++++--------
 .../usergrid/persistence/CountingMutatorIT.java |  4 +-
 .../persistence/EntityConnectionsIT.java        | 16 ++--
 .../usergrid/persistence/EntityManagerIT.java   | 20 ++---
 .../org/apache/usergrid/persistence/GeoIT.java  | 32 +++----
 .../persistence/GeoQueryBooleanTest.java        |  4 +-
 .../apache/usergrid/persistence/IndexIT.java    | 18 ++--
 .../usergrid/persistence/PathQueryIT.java       |  7 +-
 .../usergrid/persistence/PermissionsIT.java     |  6 +-
 .../usergrid/persistence/RebuildIndexTest.java  | 26 +++---
 .../cassandra/EntityManagerFactoryImplIT.java   | 13 +--
 .../persistence/query/ConnectionHelper.java     |  2 +-
 .../query/IntersectionTransitivePagingIT.java   |  2 +-
 .../query/IntersectionUnionPagingIT.java        |  2 +-
 .../persistence/query/IteratingQueryIT.java     | 32 +++----
 .../persistence/query/NotSubPropertyIT.java     |  2 +-
 .../persistence/query/ParenthesisProblemIT.java |  2 +-
 .../resources/usergrid-custom-test.properties   |  3 +
 .../usergrid/persistence/qakka/QakkaFig.java    |  4 +-
 .../qakka/core/impl/InMemoryQueue.java          |  4 +-
 .../core/impl/QueueMessageManagerImpl.java      |  9 +-
 .../distributed/actors/QueueActorHelper.java    | 15 +++-
 .../distributed/actors/QueueActorRouter.java    |  2 +-
 .../distributed/actors/ShardAllocator.java      |  4 +-
 .../impl/DistributedQueueServiceImpl.java       | 26 ++++--
 .../distributed/actors/ShardAllocatorTest.java  |  3 +
 .../queue/src/test/resources/qakka.properties   |  5 +-
 .../usergrid/rest/CollectionMetadataIT.java     |  4 +-
 .../apache/usergrid/rest/NotificationsIT.java   | 14 ++-
 .../apache/usergrid/rest/PartialUpdateTest.java |  6 +-
 .../apache/usergrid/rest/SystemResourceIT.java  |  3 +-
 .../rest/applications/ApplicationCreateIT.java  |  2 +-
 .../rest/applications/ApplicationDeleteIT.java  |  6 +-
 .../applications/ApplicationResourceIT.java     | 12 +--
 .../applications/assets/AssetResourceIT.java    | 26 +++---
 .../applications/assets/AwsAssetResourceIT.java | 22 ++---
 .../collection/BrowserCompatibilityTest.java    |  2 +-
 .../collection/CollectionsResourceIT.java       | 70 +++++++--------
 .../collection/DuplicateNameIT.java             |  2 +-
 .../activities/ActivityResourceIT.java          |  8 +-
 .../collection/activities/PutTest.java          |  6 +-
 .../collection/devices/DevicesResourceIT.java   | 12 +--
 .../collection/groups/GroupResourceIT.java      | 39 +++++----
 .../collection/paging/PagingResourceIT.java     | 10 +--
 .../users/ConnectionResourceTest.java           | 22 ++---
 .../collection/users/OwnershipResourceIT.java   | 24 +++---
 .../collection/users/PermissionsResourceIT.java | 58 ++++++-------
 .../collection/users/RetrieveUsersTest.java     |  8 +-
 .../collection/users/UserResourceIT.java        | 89 ++++++++++----------
 .../applications/events/EventsResourceIT.java   |  6 +-
 .../applications/queries/BasicGeoTests.java     |  4 +-
 .../applications/queries/GeoPagingTest.java     | 13 +--
 .../applications/queries/MatrixQueryTests.java  |  5 +-
 .../rest/applications/queries/OrderByTest.java  |  6 +-
 .../applications/queries/QueryTestBase.java     |  2 +-
 .../queries/SelectMappingsQueryTest.java        | 14 +--
 .../usergrid/rest/management/AccessTokenIT.java |  8 +-
 .../usergrid/rest/management/AdminUsersIT.java  | 33 ++++----
 .../rest/management/ExportResourceIT.java       |  2 +-
 .../rest/management/ImportResourceIT.java       | 14 +--
 .../rest/management/ManagementResourceIT.java   | 18 ++--
 .../rest/management/OrganizationsIT.java        | 16 ++--
 .../rest/management/RegistrationIT.java         |  4 +-
 .../rest/test/resource/AbstractRestIT.java      | 12 ++-
 .../resources/usergrid-custom-test.properties   |  7 +-
 .../services/notifications/QueueListener.java   | 15 ++--
 .../services/notifications/gcm/GCMAdapter.java  |  3 +-
 .../org/apache/usergrid/ServiceApplication.java |  2 +-
 .../apache/usergrid/management/EmailFlowIT.java |  2 +-
 .../org/apache/usergrid/management/RoleIT.java  |  5 +-
 .../usergrid/services/CollectionServiceIT.java  |  2 +-
 .../usergrid/services/GroupServiceIT.java       |  5 +-
 .../usergrid/services/ServiceInvocationIT.java  |  6 +-
 .../AbstractServiceNotificationIT.java          | 11 +--
 .../apns/NotificationsServiceIT.java            | 40 +++++----
 .../gcm/NotificationsServiceIT.java             | 15 ++--
 .../resources/usergrid-custom-test.properties   | 15 ++--
 86 files changed, 605 insertions(+), 596 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index 1071842..cdb4fc7 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -3089,52 +3089,16 @@ public class CpEntityManager implements EntityManager {
         managerCache.getEntityIndex(applicationScope).addIndex(newIndexName, shards, replicas, writeConsistency);
     }
 
+
     @Override
     public void initializeIndex(){
         managerCache.getEntityIndex(applicationScope).initialize();
     }
-    /**
-     * TODO, these 3 methods are super janky.  During refactoring we should clean this model up
-     */
-    public EntityIndex.IndexRefreshCommandInfo refreshIndex() {
-        try {
-            long start = System.currentTimeMillis();
-            // refresh special indexes without calling EntityManager refresh because stack overflow
-            Map<String, Object> map = new org.apache.usergrid.persistence.index.utils.MapUtils.HashMapBuilder<>();
-            map.put("some prop", "test");
-            boolean hasFinished = false;
-            Entity refreshEntity = create("refresh", map);
-            EntityIndex.IndexRefreshCommandInfo indexRefreshCommandInfo
-                = managerCache.getEntityIndex(applicationScope).refreshAsync().toBlocking().first();
 
-            try {
-                for (int i = 0; i < 20; i++) {
-                    if (searchCollection(
-                        new SimpleEntityRef(
-                            org.apache.usergrid.persistence.entities.Application.ENTITY_TYPE, getApplicationId()),
-                        InflectionUtils.pluralize("refresh"),
-                        Query.fromQL("select * where uuid='" + refreshEntity.getUuid() + "'")
-                    ).size() > 0
-                        ) {
-                        hasFinished = true;
-                        break;
-                    }
-                    int sleepTime = 500;
-                    logger.info("Sleeping {} ms during refreshIndex", sleepTime);
-                    Thread.sleep(sleepTime);
 
-                    indexRefreshCommandInfo
-                        = managerCache.getEntityIndex(applicationScope).refreshAsync().toBlocking().first();
-                }
-                if(!hasFinished){
-                    throw new RuntimeException("Did not find entity {} during refresh. uuid->"+refreshEntity.getUuid());
-                }
-            }finally {
-                delete(refreshEntity);
-            }
-            Thread.sleep(100);
-
-            return indexRefreshCommandInfo;
+    public EntityIndex.IndexRefreshCommandInfo refreshIndex() {
+        try {
+            return managerCache.getEntityIndex(applicationScope).refreshAsync().toBlocking().first();
         } catch (Exception e) {
             throw new RuntimeException("refresh failed",e);
         }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
index 81960f5..2ba6c0b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
@@ -36,6 +36,7 @@ import com.google.inject.Inject;
 import com.google.inject.Provider;
 import com.google.inject.Singleton;
 
+import static org.apache.usergrid.persistence.queue.LegacyQueueManager.Implementation.DISTRIBUTED;
 import static org.apache.usergrid.persistence.queue.LegacyQueueManager.Implementation.LOCAL;
 
 
@@ -121,6 +122,10 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> {
             asyncEventService.MAX_TAKE = 1000;
         }
 
+        if ( impl.equals( DISTRIBUTED )) {
+            asyncEventService.MAX_TAKE = 500;
+        }
+
         return asyncEventService;
     }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/Application.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/Application.java b/stack/core/src/test/java/org/apache/usergrid/Application.java
index 378a4f7..102ee9c 100644
--- a/stack/core/src/test/java/org/apache/usergrid/Application.java
+++ b/stack/core/src/test/java/org/apache/usergrid/Application.java
@@ -152,7 +152,9 @@ public interface Application extends TestRule {
 
     public void remove( EntityRef entityRef ) throws Exception;
 
-    public void refreshIndex();
+    public void waitForQueueDrainAndRefreshIndex(int waitTimeMillis);
+
+    public void waitForQueueDrainAndRefreshIndex();
 
     /**
      * Get the entity manager

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/CoreApplication.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/CoreApplication.java b/stack/core/src/test/java/org/apache/usergrid/CoreApplication.java
index 9046f02..f505ead 100644
--- a/stack/core/src/test/java/org/apache/usergrid/CoreApplication.java
+++ b/stack/core/src/test/java/org/apache/usergrid/CoreApplication.java
@@ -181,8 +181,8 @@ public class CoreApplication implements Application, TestRule {
 
         logger.info( "Created new application {} in organization {}", appName, orgName );
 
-//        //wait for the index before proceeding
-//        em.refreshIndex();
+        //wait for the index before proceeding
+        waitForQueueDrainAndRefreshIndex(500);
 
     }
 
@@ -223,19 +223,21 @@ public class CoreApplication implements Application, TestRule {
         return em.get( new SimpleEntityRef( type, id ) );
     }
 
-
     @Override
-    public synchronized void refreshIndex() {
-        //Insert test entity and find it
-        setup.getEmf().refreshIndex(CpNamingUtils.getManagementApplicationId().getUuid());
-
-        if (!em.getApplicationId().equals(CpNamingUtils.getManagementApplicationId().getUuid())) {
-            setup.getEmf().refreshIndex(em.getApplicationId());
+    public synchronized void waitForQueueDrainAndRefreshIndex(int waitTimeMillis) {
+        try{
+            Thread.sleep(waitTimeMillis);
+        } catch (InterruptedException e ){
+            //noop
         }
-
         em.refreshIndex();
     }
 
+    @Override
+    public synchronized void waitForQueueDrainAndRefreshIndex() {
+        waitForQueueDrainAndRefreshIndex(750);
+    }
+
 
     @Override
     public EntityManager getEntityManager() {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/CoreITSetupImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/CoreITSetupImpl.java b/stack/core/src/test/java/org/apache/usergrid/CoreITSetupImpl.java
index 64b001c..bd6ae3e 100644
--- a/stack/core/src/test/java/org/apache/usergrid/CoreITSetupImpl.java
+++ b/stack/core/src/test/java/org/apache/usergrid/CoreITSetupImpl.java
@@ -156,16 +156,28 @@ public class CoreITSetupImpl implements CoreITSetup, TestEntityIndex {
     @Override
     public void refresh(UUID appId){
         try {
-            Thread.sleep(50);
-        } catch (InterruptedException ie){
 
+            Thread.sleep(125);
+
+        } catch (InterruptedException ie){
+            //noop
         }
+
         emf.refreshIndex(appId);
 
+    }
+
+    @Override
+    public void waitForQueueDrainAndRefresh(UUID appId, int waitTimeMillis){
         try {
-            Thread.sleep(50);
-        } catch (InterruptedException ie){
 
+            Thread.sleep(waitTimeMillis);
+
+        } catch (InterruptedException ie){
+            //noop
         }
+
+        emf.refreshIndex(appId);
+
     }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/TestEntityIndex.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/TestEntityIndex.java b/stack/core/src/test/java/org/apache/usergrid/TestEntityIndex.java
index 7da187a..e5e979e 100644
--- a/stack/core/src/test/java/org/apache/usergrid/TestEntityIndex.java
+++ b/stack/core/src/test/java/org/apache/usergrid/TestEntityIndex.java
@@ -26,4 +26,5 @@ import java.util.UUID;
  */
 public interface TestEntityIndex {
     void refresh(UUID appId);
+    void waitForQueueDrainAndRefresh(UUID appId, int waitTimeMillis);
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/corepersistence/AggregationServiceTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/AggregationServiceTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/AggregationServiceTest.java
index 9f1c9a4..55ce26e 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/AggregationServiceTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/AggregationServiceTest.java
@@ -48,8 +48,8 @@ public class AggregationServiceTest extends AbstractCoreIT {
         props.put("name", "myname");
         Entity entity1 = this.app.getEntityManager().create("test", props);
         Entity entity2 = this.app.getEntityManager().create("test2", props);
-        this.app.refreshIndex();
-        Thread.sleep(500);
+
+        this.app.waitForQueueDrainAndRefreshIndex(500);
 
         long sum = aggregationService.getApplicationSize(applicationScope);
 
@@ -57,23 +57,24 @@ public class AggregationServiceTest extends AbstractCoreIT {
         Assert.assertTrue(sum > (entity1.getSize() + entity2.getSize()));
 
         long sum1 = aggregationService.getSize(applicationScope, CpNamingUtils.createCollectionSearchEdge(applicationScope.getApplication(), "tests"));
-        Assert.assertEquals(sum1, entity1.getSize());
+        Assert.assertEquals(entity1.getSize(), sum1);
 
         long sum2 = aggregationService.getSize(applicationScope, CpNamingUtils.createCollectionSearchEdge(applicationScope.getApplication(), "test2s"));
-        Assert.assertEquals(sum2, entity2.getSize());
+        Assert.assertEquals(entity2.getSize(), sum2);
 
         props = new HashMap<>();
         props.put("test", 1234);
         props.put("name", "myname2");
         Entity entity3 = this.app.getEntityManager().create("test", props);
 
-        this.app.refreshIndex();
+        this.app.waitForQueueDrainAndRefreshIndex(500);
+
         long sum3 = aggregationService.getSize(applicationScope, CpNamingUtils.createCollectionSearchEdge(applicationScope.getApplication(), "tests"));
-        Assert.assertEquals(sum3, entity1.getSize() + entity3.getSize());
+        Assert.assertEquals(entity1.getSize() + entity3.getSize(), sum3);
 
         Map<String,Long> sumEach = aggregationService.getEachCollectionSize(applicationScope);
         Assert.assertTrue(sumEach.containsKey("tests") && sumEach.containsKey("test2s"));
-        Assert.assertEquals(sum3, (long) sumEach.get("tests"));
+        Assert.assertEquals((long) sumEach.get("tests"), sum3);
 
     }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
index abe2615..0abd7a2 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
@@ -20,7 +20,6 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.UUID;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -105,7 +104,7 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
         Entity thing = em.create("thing", new HashMap<String, Object>() {{
             put("name", "thing1");
         }});
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         Thread.sleep(1000);
         assertEquals(1, queryCollectionCp("things", "thing", "select *").size());
@@ -116,7 +115,7 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
         em.updateProperties(thing, new HashMap<String, Object>() {{
             put("stuff", "widget");
         }});
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
         Thread.sleep(1000);
 
         org.apache.usergrid.persistence.model.entity.Entity cpUpdated = getCpEntity(thing);
@@ -161,7 +160,7 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
             }}));
             Thread.sleep( writeDelayMs );
         }
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         CandidateResults crs = queryCollectionCp( "things", "thing", "select *");
         Assert.assertEquals( "Expect no stale candidates yet", numEntities, crs.size() );
@@ -210,7 +209,7 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
         Thread.sleep(250); // delete happens asynchronously, wait for some time
 
         //refresh the app index
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         Thread.sleep(250); // refresh happens asynchronously, wait for some time
 
@@ -231,7 +230,7 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
                }
             });
             //refresh the app index
-            app.refreshIndex();
+            app.waitForQueueDrainAndRefreshIndex();
 
             crs = queryCollectionCp("things", "thing", "select *");
 
@@ -265,7 +264,7 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
                 put("name", dogName);
             }}));
         }
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         CandidateResults crs = queryCollectionCp( "dogs", "dog", "select *");
         Assert.assertEquals("Expect no stale candidates yet", numEntities, crs.size());
@@ -288,7 +287,7 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
             }
 
         }
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         // wait for indexes to be cleared for the deleted entities
         count = 0;
@@ -296,7 +295,7 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
         do {
             //trigger the repair
             queryCollectionEm("dogs", "select * order by created");
-            app.refreshIndex();
+            app.waitForQueueDrainAndRefreshIndex();
             crs = queryCollectionCp("dogs", "dog", "select *");
         } while ( crs.size() != numEntities && count++ < 15 );
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/persistence/ApplicationServiceIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/ApplicationServiceIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/ApplicationServiceIT.java
index 9ad90eb..547691f 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/ApplicationServiceIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/ApplicationServiceIT.java
@@ -23,7 +23,6 @@ import com.google.common.base.Optional;
 import com.google.inject.Injector;
 import org.apache.usergrid.AbstractCoreIT;
 import org.apache.usergrid.cassandra.SpringResource;
-import org.apache.usergrid.corepersistence.service.AggregationService;
 import org.apache.usergrid.corepersistence.service.AggregationServiceFactory;
 import org.apache.usergrid.corepersistence.util.CpNamingUtils;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
@@ -32,7 +31,6 @@ import org.apache.usergrid.persistence.graph.GraphManager;
 import org.apache.usergrid.persistence.graph.GraphManagerFactory;
 import org.apache.usergrid.persistence.graph.MarkedEdge;
 import org.apache.usergrid.persistence.graph.SearchByEdgeType;
-import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.junit.Assert;
@@ -65,7 +63,7 @@ public class ApplicationServiceIT extends AbstractCoreIT {
             map.put("somekey", UUID.randomUUID());
            Entity entity = entityManager.create("tests", map);
         }
-        this.app.refreshIndex();
+        this.app.waitForQueueDrainAndRefreshIndex();
         Thread.sleep(500);
         ApplicationScope appScope  = CpNamingUtils.getApplicationScope(entityManager.getApplicationId());
         Observable<Id> ids =
@@ -76,7 +74,7 @@ public class ApplicationServiceIT extends AbstractCoreIT {
             this.app.getApplicationService().deleteAllEntities(appScope, 5);
         count = ids.count().toBlocking().last();
         Assert.assertEquals(count, 5);
-        this.app.refreshIndex();
+        this.app.waitForQueueDrainAndRefreshIndex();
         Injector injector = SpringResource.getInstance().getBean(Injector.class);
         GraphManagerFactory factory = injector.getInstance(GraphManagerFactory.class);
         GraphManager graphManager = factory.createEdgeManager(appScope);

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java
index 3305e0e..f484f4f 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java
@@ -132,7 +132,7 @@ public class CollectionIT extends AbstractCoreIT {
         activity3 = app.get( activity3.getUuid(), activity3.getType() );
         app.addToCollection( user, "activities", activity3 );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         // empty query
         Query query = new Query();
@@ -259,7 +259,7 @@ public class CollectionIT extends AbstractCoreIT {
         activity3 = app.get(activity3.getUuid(), activity3.getType());
         app.addToCollection(user, "activities", activity3);
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         // empty query
         Query query = new Query();
@@ -295,7 +295,7 @@ public class CollectionIT extends AbstractCoreIT {
         Entity user = em.create( "user", properties );
         assertNotNull( user );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         // EntityRef
         Query query = Query.fromQL( "firstname = '" + firstName + "'" );
@@ -315,7 +315,7 @@ public class CollectionIT extends AbstractCoreIT {
 
         em.update( user );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         // search with the old username, should be no results
         query = Query.fromQL( "firstname = '" + firstName + "'" );
@@ -354,7 +354,7 @@ public class CollectionIT extends AbstractCoreIT {
         Entity user = em.create( "user", properties );
         assertNotNull( user );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         // EntityRef
         final Query query = Query.fromQL( "middlename = '" + middleName + "'" );
@@ -386,7 +386,7 @@ public class CollectionIT extends AbstractCoreIT {
         Entity user = em.create( "user", properties );
         assertNotNull( user );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         // EntityRef
         final Query query = Query.fromQL( "lastname = '" + lastName + "'" );
@@ -434,7 +434,7 @@ public class CollectionIT extends AbstractCoreIT {
         properties.put("nickname", "ed");
         em.updateProperties(user1, properties);
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
         Thread.sleep(1000);
 
         final Query query = Query.fromQL( "nickname = 'ed'" );
@@ -469,7 +469,7 @@ public class CollectionIT extends AbstractCoreIT {
         Entity group = em.create( "group", properties );
         assertNotNull( group );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         // EntityRef
         final Query query = Query.fromQL( "name = '" + groupName + "'" );
@@ -501,7 +501,7 @@ public class CollectionIT extends AbstractCoreIT {
         Entity group = em.create( "group", properties );
         assertNotNull( group );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         // EntityRef
 
@@ -559,7 +559,7 @@ public class CollectionIT extends AbstractCoreIT {
 
         em.addToCollection( user, "activities", em.create( "activity", properties ) );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         final Query query = Query.fromQL( "verb = 'post'" );
 
@@ -593,7 +593,7 @@ public class CollectionIT extends AbstractCoreIT {
         Entity user2 = em.create( "user", properties );
         assertNotNull( user2 );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         // EntityRef
         Query query = new Query();
@@ -636,7 +636,7 @@ public class CollectionIT extends AbstractCoreIT {
         Entity user2 = em.create( "user", properties );
         assertNotNull( user2 );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         // EntityRef
         Query query = new Query();
@@ -677,7 +677,7 @@ public class CollectionIT extends AbstractCoreIT {
         Entity game2 = em.create( "orquerygame", properties );
         assertNotNull( game2 );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         // EntityRef
         Query query = Query
@@ -756,7 +756,7 @@ public class CollectionIT extends AbstractCoreIT {
         Entity game2 = em.create( "game", properties );
         assertNotNull( game2 );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         // overlap
         Query query = Query.fromQL(
@@ -817,7 +817,7 @@ public class CollectionIT extends AbstractCoreIT {
         Entity game2 = em.create( "game", properties );
         assertNotNull( game2 );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         // simple not
         Query query = Query.fromQL( "select * where NOT keywords contains 'game'" );
@@ -893,7 +893,7 @@ public class CollectionIT extends AbstractCoreIT {
         Entity entity2 = em.create( "game", properties );
         assertNotNull( entity2 );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
 
         // search for games without sub-field Foo should returned zero entities
@@ -949,7 +949,7 @@ public class CollectionIT extends AbstractCoreIT {
         properties.put("keywords", "Action, New");
         em.create( "game", properties );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         Query query = Query.fromQL( "select * where keywords contains 'hot' or title contains 'hot'" );
         Results r = em.searchCollection( em.getApplicationRef(), "games", query );
@@ -980,7 +980,7 @@ public class CollectionIT extends AbstractCoreIT {
         properties.put( "keywords", "Action, New" );
         Entity thirdGame = em.create( "game", properties );
 
-        app.refreshIndex();//need to track all batches then resolve promises
+        app.waitForQueueDrainAndRefreshIndex();//need to track all batches then resolve promises
 
         Query query = Query.fromQL( "select * where keywords contains 'new' and title contains 'extreme'" );
         Results r = em.searchCollection( em.getApplicationRef(), "games", query );
@@ -1011,7 +1011,7 @@ public class CollectionIT extends AbstractCoreIT {
             entityIds.add( created.getUuid() );
         }
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         Query query = new Query();
         query.setLimit( 50 );
@@ -1039,7 +1039,7 @@ public class CollectionIT extends AbstractCoreIT {
             numDeleted++;
         }
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         // wait for indexes to be cleared
         Thread.sleep(1000); //TODO find why we have to wait.  This is a bug
@@ -1096,7 +1096,7 @@ public class CollectionIT extends AbstractCoreIT {
 
         int pageSize = 10;
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
         final Query query = Query.fromQL( "index < " + size * 2 + " order by index asc" );
 
         Results r = null;
@@ -1147,7 +1147,7 @@ public class CollectionIT extends AbstractCoreIT {
 
         int pageSize = 10;
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         Query query = Query.fromQL( "select * where index >= " + size / 2 + " sort by index asc" );
         query.setLimit( pageSize );
@@ -1201,7 +1201,7 @@ public class CollectionIT extends AbstractCoreIT {
             entityIds.add( created.getUuid() );
         }
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         int pageSize = 10;
 
@@ -1254,7 +1254,7 @@ public class CollectionIT extends AbstractCoreIT {
             entityIds.add( created.getUuid() );
         }
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         int pageSize = 5;
 
@@ -1310,7 +1310,7 @@ public class CollectionIT extends AbstractCoreIT {
 
         Entity saved = em.create( "test", root );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         Query query = Query.fromQL( "rootprop1 = 'simpleprop'" );
         Entity entity;
@@ -1357,7 +1357,7 @@ public class CollectionIT extends AbstractCoreIT {
 
         Entity saved = em.create( "test", jsonData );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         Query query = Query.fromQL( "intprop = 10" );
 
@@ -1416,7 +1416,7 @@ public class CollectionIT extends AbstractCoreIT {
 
         Entity saved = em.create( "test", props );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         Query query = Query.fromQL( "myString = 'My simple string'" );
 
@@ -1441,7 +1441,7 @@ public class CollectionIT extends AbstractCoreIT {
 
         em.create( "user", properties );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         String s = "select username, email where username = 'edanuff'";
         Query query = Query.fromQL( s );
@@ -1471,7 +1471,7 @@ public class CollectionIT extends AbstractCoreIT {
 
         em.create( "user", properties );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         String s = "select {name: username, email: email} where username = 'edanuff'";
         Query query = Query.fromQL( s );
@@ -1503,7 +1503,7 @@ public class CollectionIT extends AbstractCoreIT {
 
         final Entity entity = em.create( "user", properties );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         String s = "select * where username = 'ed@anuff.com'";
         Query query = Query.fromQL( s );
@@ -1525,7 +1525,7 @@ public class CollectionIT extends AbstractCoreIT {
 
         em.createConnection( foo, "testconnection", entity );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         // now query via the testConnection, this should work
 
@@ -1569,7 +1569,7 @@ public class CollectionIT extends AbstractCoreIT {
 
         em.create( "loveobject", properties );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         location = new LinkedHashMap<String, Object>();
         location.put( "Place", "Via Pietro Maroncelli, 48, 62012 Santa Maria Apparente Province of Macerata, Italy" );
@@ -1587,7 +1587,7 @@ public class CollectionIT extends AbstractCoreIT {
 
         em.create( "loveobject", properties );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         // String s = "select * where Flag = 'requested'";
         // String s = "select * where Flag = 'requested' and NOT Recipient.Username =
@@ -1632,7 +1632,7 @@ public class CollectionIT extends AbstractCoreIT {
             createdEntities.add( created );
         }
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         Results r = em.getCollection( em.getApplicationRef(), "users", null, 50, Level.ALL_PROPERTIES, false );
 
@@ -1729,7 +1729,7 @@ public class CollectionIT extends AbstractCoreIT {
         Entity game2 = em.create( "game", properties );
         assertNotNull( game2 );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         // overlap
         Query query = new Query();
@@ -1763,7 +1763,7 @@ public class CollectionIT extends AbstractCoreIT {
         Entity game2 = em.create( "game", properties );
         assertNotNull( game2 );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         // overlap
         Query query = new Query();
@@ -1797,7 +1797,7 @@ public class CollectionIT extends AbstractCoreIT {
         Entity createUser2 = em.create( user2 );
         assertNotNull( createUser2 );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         // overlap
         Query query = new Query();

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/persistence/CountingMutatorIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/CountingMutatorIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/CountingMutatorIT.java
index 63c7cb8..596ec7c 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/CountingMutatorIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/CountingMutatorIT.java
@@ -74,7 +74,7 @@ public class CountingMutatorIT extends AbstractCoreIT {
         properties.put( "username", "testuser" );
         properties.put( "email", "test@foo.bar" );
         Entity created = em.create( "user", properties );
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         Entity returned = em.get( created.getUuid() );
 
@@ -89,7 +89,7 @@ public class CountingMutatorIT extends AbstractCoreIT {
 
 
             Entity connectedEntity = em.create( "user", connectedProps );
-            app.refreshIndex();
+            app.waitForQueueDrainAndRefreshIndex();
 
             // Connect from our new entity to our root one so it's updated when paging
             em.createConnection( connectedEntity, "following", returned );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/persistence/EntityConnectionsIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/EntityConnectionsIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/EntityConnectionsIT.java
index 296bf53..e1e24c4 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/EntityConnectionsIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/EntityConnectionsIT.java
@@ -64,7 +64,7 @@ public class EntityConnectionsIT extends AbstractCoreIT {
         assertEquals( 1, connectionTypes.size());
         assertEquals("likes", connectionTypes.iterator().next());
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         Results r = em.getTargetEntities(firstUserEntity, "likes", null, Level.IDS);
 
@@ -128,7 +128,7 @@ public class EntityConnectionsIT extends AbstractCoreIT {
         logger.info( "\n\nConnecting " + awardA.getUuid() + " \"awarded\" " + catB.getUuid() + "\n" );
         em.createConnection( awardA, "awarded", catB );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         // List forward connections for cat A
 
@@ -149,7 +149,7 @@ public class EntityConnectionsIT extends AbstractCoreIT {
         logger.info( "\n\nConnecting " + awardA.getUuid() + " \"awarded\" " + catA.getUuid() + "\n" );
         em.createConnection( awardA, "awarded", catA );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         // List forward connections for cat A
 // Not valid with current usages
@@ -256,7 +256,7 @@ public class EntityConnectionsIT extends AbstractCoreIT {
 
         em.createConnection( secondUserEntity, "likes", arrogantbutcher );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         Results r = em.getTargetEntities(firstUserEntity, "likes", "restaurant", Level.IDS);
 
@@ -310,7 +310,7 @@ public class EntityConnectionsIT extends AbstractCoreIT {
 
         em.createConnection( fredEntity, "likes", wilmaEntity );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
 //        // search for "likes" edges from fred
 //        assertEquals( 1,
@@ -363,7 +363,7 @@ public class EntityConnectionsIT extends AbstractCoreIT {
         em.createConnection( fredEntity, "likes", JohnEntity );
 
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         // now query via the testConnection, this should work
 
@@ -410,7 +410,7 @@ public class EntityConnectionsIT extends AbstractCoreIT {
         }
 
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         Results r = em.getTargetEntities(firstUserEntity, "likes", null, Level.ALL_PROPERTIES) ;
 
@@ -453,7 +453,7 @@ public class EntityConnectionsIT extends AbstractCoreIT {
 //
 //        em.createConnection( fredEntity, "likes", wilmaEntity );
 //
-//        app.refreshIndex();
+//        app.waitForQueueDrainAndRefreshIndex();
 //
 ////        // search for "likes" edges from fred
 ////        assertEquals( 1,

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/persistence/EntityManagerIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/EntityManagerIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/EntityManagerIT.java
index cb3a728..e1e4a05 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/EntityManagerIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/EntityManagerIT.java
@@ -75,7 +75,7 @@ public class EntityManagerIT extends AbstractCoreIT {
         assertEquals( "user.username not expected value", "edanuff", user.getProperty( "username" ) );
         assertEquals( "user.email not expected value", "ed@anuff.com", user.getProperty( "email" ) );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         EntityRef userRef = em.getAlias( new SimpleEntityRef( "application", applicationId ), "users", "edanuff" );
 
@@ -274,13 +274,13 @@ public class EntityManagerIT extends AbstractCoreIT {
         Entity thing = em.create( "thing", properties );
         logger.info( "Entity created" );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         logger.info( "Starting entity delete" );
         em.delete( thing );
         logger.info( "Entity deleted" );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         // now search by username, no results should be returned
 
@@ -310,13 +310,13 @@ public class EntityManagerIT extends AbstractCoreIT {
         Entity user = em.create( "user", properties );
         logger.info( "Entity created" );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         logger.info( "Starting entity delete" );
         em.delete( user );
         logger.info( "Entity deleted" );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         // now search by username, no results should be returned
 
@@ -335,7 +335,7 @@ public class EntityManagerIT extends AbstractCoreIT {
         user = em.create( "user", properties );
         logger.info( "Entity created" );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         final Query userNameQuery = Query.fromQL( "username = '" + name + "'" );
 
@@ -456,7 +456,7 @@ public class EntityManagerIT extends AbstractCoreIT {
 
         EntityRef appRef = em.get( new SimpleEntityRef( "application", app.getId() ) );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         Results r = em.getCollection( appRef, "things", null, 10, Level.ALL_PROPERTIES, false );
 
@@ -548,7 +548,7 @@ public class EntityManagerIT extends AbstractCoreIT {
 
         Entity createdDevice = em.createItemInCollection( createdUser, "devices", "device", device );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         Entity returnedDevice = em.get( new SimpleEntityRef( "device", createdDevice.getUuid() ) );
 
@@ -580,7 +580,7 @@ public class EntityManagerIT extends AbstractCoreIT {
         Entity user = em.create( "robot", properties );
         assertNotNull( user );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         assertNotNull( em.get( user.getUuid() ) );
     }
@@ -608,7 +608,7 @@ public class EntityManagerIT extends AbstractCoreIT {
         em.addToCollection(appRef, "fluffies", entityRef);
         em.addToCollection(appRef, "fluffies", entityRef);
 
-        //app.refreshIndex();
+        //app.waitForQueueDrainAndRefreshIndex();
 
         Results results = em.getCollection(appRef,
             "fluffies", null, 10, Level.ALL_PROPERTIES, true);

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/persistence/GeoIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/GeoIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/GeoIT.java
index b7d708e..df77084 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/GeoIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/GeoIT.java
@@ -96,7 +96,7 @@ public class GeoIT extends AbstractCoreIT {
         assertNotNull(hotel);
 
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         //2. Query with a globally large distance to verify location
 
@@ -142,7 +142,7 @@ public class GeoIT extends AbstractCoreIT {
         }};
         Entity user = em.create("user", properties);
         assertNotNull(user);
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         //2. Query with a globally large distance to verify location
         Query query = Query.fromQL("select * where location within " + Integer.MAX_VALUE + " of 0, 0");
@@ -154,7 +154,7 @@ public class GeoIT extends AbstractCoreIT {
         user.getDynamicProperties().remove("location");
         em.updateProperties(user, properties);
         em.update(user);
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         //4. Repeat the query, expecting no results
         listResults = em.searchCollection(em.getApplicationRef(), "users", query);
@@ -188,7 +188,7 @@ public class GeoIT extends AbstractCoreIT {
         }};
         Entity user = em.create("user", properties);
         assertNotNull(user);
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         final double lat = 37.776753;
         final double lon = -122.407846;
@@ -234,7 +234,7 @@ public class GeoIT extends AbstractCoreIT {
         }};
         Entity user = em.create("user", properties);
         assertNotNull(user);
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         final double lat = 37.776753;
         final double lon = -122.407846;
@@ -284,12 +284,12 @@ public class GeoIT extends AbstractCoreIT {
 
         Entity user = em.create("user", userProperties);
         assertNotNull(user);
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         //3. Create a connection between the user and the entity
         em.createConnection(user, "likes", restaurant);
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
         //4. Test that the user is within 2000m of the entity
         Results emSearchResults = em.searchTargetEntities(user,
             Query.fromQL("location within 5000 of "
@@ -326,7 +326,7 @@ public class GeoIT extends AbstractCoreIT {
             assertNotNull(entity);
             logger.debug("Entity {} created", entity.getProperty("name"));
         }
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
         //2. validate the size of the result
         Query query = new Query();
         Results listResults = em.searchCollection(em.getApplicationRef(), "stores", query);
@@ -367,7 +367,7 @@ public class GeoIT extends AbstractCoreIT {
             assertNotNull(entity);
             logger.debug("Entity {} created", entity.getProperty("name"));
         }
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
         //2. validate the size of the result
         Query query = new Query();
         Results listResults = em.searchCollection(em.getApplicationRef(), "stores", query);
@@ -540,7 +540,7 @@ public class GeoIT extends AbstractCoreIT {
             em.create("store", data);
         }
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
          // earth's circumference is 40,075 kilometers. Up it to 50,000kilometers
         // just to be save
@@ -596,7 +596,7 @@ public class GeoIT extends AbstractCoreIT {
             em.create("store", data);
         }
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
         Thread.sleep(2000);
 
         // earth's circumference is 40,075 kilometers. Up it to 50,000kilometers
@@ -669,7 +669,7 @@ public class GeoIT extends AbstractCoreIT {
             em.create("store", data);
         }
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
          // earth's circumference is 40,075 kilometers. Up it to 50,000kilometers
         // just to be save
@@ -729,7 +729,7 @@ public class GeoIT extends AbstractCoreIT {
             created.add(e);
         }
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         int startDelta = size - min;
 
@@ -794,7 +794,7 @@ public class GeoIT extends AbstractCoreIT {
             em.create("store", data);
         }
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         //do a direct geo iterator test.  We need to make sure that we short circuit on the correct tile.
 
@@ -838,7 +838,7 @@ public class GeoIT extends AbstractCoreIT {
             assertNotNull(entity);
         }
         //3. refresh the index
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
         //4. return the entity manager
         return em;
     }
@@ -857,7 +857,7 @@ public class GeoIT extends AbstractCoreIT {
         latlong.put("longitude", longitude);
 
         em.setProperty(entity, "location", latlong);
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
     }
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/persistence/GeoQueryBooleanTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/GeoQueryBooleanTest.java b/stack/core/src/test/java/org/apache/usergrid/persistence/GeoQueryBooleanTest.java
index 9a3f5a6..609f977 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/GeoQueryBooleanTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/GeoQueryBooleanTest.java
@@ -79,7 +79,7 @@ public class GeoQueryBooleanTest extends AbstractCoreIT {
         Entity user2 = em.create( "user", properties );
         assertNotNull( user2 );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         // define center point about 300m from that location
         final double lat = 37.774277;
@@ -158,7 +158,7 @@ public class GeoQueryBooleanTest extends AbstractCoreIT {
         Entity userFred = em.create( "user", properties );
         assertNotNull( userFred );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         // define center point about 300m from that location
         final double lat = 37.774277;

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/persistence/IndexIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/IndexIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/IndexIT.java
index d62f88e..5933b57 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/IndexIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/IndexIT.java
@@ -60,7 +60,7 @@ public class IndexIT extends AbstractCoreIT {
             em.create( "item", properties );
         }
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         int i = 0;
 
@@ -133,7 +133,7 @@ public class IndexIT extends AbstractCoreIT {
             em.create( "item", properties );
         }
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         Query query = Query.fromQL( "name < 'delta' order by name asc" );
         Results r = em.searchCollection( em.getApplicationRef(), "items", query );
@@ -261,7 +261,7 @@ public class IndexIT extends AbstractCoreIT {
             em.create( "item", properties );
         }
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         Query query = Query.fromQL( "group = 1 order by name desc" );
         Results r = em.searchCollection( em.getApplicationRef(), "items", query );
@@ -290,7 +290,7 @@ public class IndexIT extends AbstractCoreIT {
 
         em.create("names", entity1);
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         //should return valid values
         Query query = Query.fromQL("select status where status = 'pickled'");
@@ -338,7 +338,7 @@ public class IndexIT extends AbstractCoreIT {
 
         em.createConnection( entity2Ref, "connecting", entity1Ref );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         //should return valid values
         Query query = Query.fromQL( "select * where status = 'pickled'" );
@@ -357,7 +357,7 @@ public class IndexIT extends AbstractCoreIT {
 
         em.update( entity1Ref );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         //query and check the status has been updated, shouldn't return results
         query = Query.fromQL( "select * where status = 'pickled'" );
@@ -413,7 +413,7 @@ public class IndexIT extends AbstractCoreIT {
 
         em.createConnection( entity2Ref, "connecting", entity1Ref );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         //should return valid values
         Query query = Query.fromQL( "select * where status = 'pickled'" );
@@ -432,7 +432,7 @@ public class IndexIT extends AbstractCoreIT {
 
         em.update( entity1Ref );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         //query and check the status has been updated, shouldn't return results
         query = Query.fromQL( "select * where status = 'pickled'" );
@@ -500,7 +500,7 @@ public class IndexIT extends AbstractCoreIT {
         }};
         em.create("names", entity2);
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         // simple single-field select mapping
         {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/persistence/PathQueryIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/PathQueryIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/PathQueryIT.java
index e6ecf97..329a5be 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/PathQueryIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/PathQueryIT.java
@@ -28,7 +28,6 @@ import java.util.UUID;
 import org.junit.Test;
 
 import org.apache.usergrid.AbstractCoreIT;
-import org.apache.usergrid.persistence.Query;
 import org.apache.usergrid.persistence.Query.Level;
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 
@@ -63,7 +62,7 @@ public class PathQueryIT extends AbstractCoreIT {
             }
         }
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         Thread.sleep(1000);
 
@@ -135,7 +134,7 @@ public class PathQueryIT extends AbstractCoreIT {
             }
         }
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         // pick an arbitrary group, ensure it has 7 users
         Results ru = em.getCollection( groups.get( 2 ), "users", null, 20, Level.IDS, false );
@@ -152,7 +151,7 @@ public class PathQueryIT extends AbstractCoreIT {
             }
         }
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         // pick an arbitrary user, ensure it has 7 devices
         Results rd = em.getCollection( users.get( 6 ), "devices", null, 20, Level.IDS, false );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/persistence/PermissionsIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/PermissionsIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/PermissionsIT.java
index 11f0692..1072d29 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/PermissionsIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/PermissionsIT.java
@@ -28,8 +28,6 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.commons.lang3.RandomStringUtils;
-
 import org.apache.usergrid.AbstractCoreIT;
 import org.apache.usergrid.persistence.entities.Role;
 import org.apache.usergrid.persistence.Query.Level;
@@ -147,7 +145,7 @@ public class PermissionsIT extends AbstractCoreIT {
         dump( "group roles", roles );
 
         em.deleteGroupRole( group.getUuid(), "author" );
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
         Thread.sleep(1000);
 
         roles = em.getGroupRoles( group.getUuid() );
@@ -156,7 +154,7 @@ public class PermissionsIT extends AbstractCoreIT {
 
         em.addUserToGroupRole( user.getUuid(), group.getUuid(), "admin" );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
         Results r = em.getUsersInGroupRole( group.getUuid(), "admin", Level.ALL_PROPERTIES );
         dump( "entities", r.getEntities() );
         assertEquals( "proper number of users in group role not set", 1, r.size() );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/persistence/RebuildIndexTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/RebuildIndexTest.java b/stack/core/src/test/java/org/apache/usergrid/persistence/RebuildIndexTest.java
index 383d620..a7759de 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/RebuildIndexTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/RebuildIndexTest.java
@@ -125,7 +125,7 @@ public class RebuildIndexTest extends AbstractCoreIT {
         }
 
         logger.info( "Created {} entities", ENTITIES_TO_INDEX );
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex(5000);
 
         // ----------------- test that we can read them, should work fine
 
@@ -163,6 +163,7 @@ public class RebuildIndexTest extends AbstractCoreIT {
 
         waitForRebuild( status, reIndexService );
 
+        app.waitForQueueDrainAndRefreshIndex(5000);
 
         // ----------------- test that we can read the catherder collection and not the catshepard
 
@@ -233,7 +234,7 @@ public class RebuildIndexTest extends AbstractCoreIT {
         }
 
         logger.info( "Created {} entities", ENTITIES_TO_INDEX );
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex(15000);
 
         // ----------------- test that we can read them, should work fine
 
@@ -247,6 +248,8 @@ public class RebuildIndexTest extends AbstractCoreIT {
 
         deleteIndex( em.getApplicationId() );
 
+        app.waitForQueueDrainAndRefreshIndex();
+
         // ----------------- test that we can read them, should fail
 
         // deleting sytem app index will interfere with other concurrently running tests
@@ -283,7 +286,6 @@ public class RebuildIndexTest extends AbstractCoreIT {
 
             logger.info( "Rebuilt index" );
 
-            app.refreshIndex();
         }
         catch ( Exception ex ) {
             logger.error( "Error rebuilding index", ex );
@@ -292,7 +294,7 @@ public class RebuildIndexTest extends AbstractCoreIT {
 
         // ----------------- test that we can read them
 
-        Thread.sleep( 2000 );
+        app.waitForQueueDrainAndRefreshIndex(15000);
         readData( em, collectionName, ENTITIES_TO_INDEX, 3 );
     }
 
@@ -343,7 +345,7 @@ public class RebuildIndexTest extends AbstractCoreIT {
 
 
         logger.info( "Created {} entities", ENTITIES_TO_INDEX );
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex(5000);
 
         // ----------------- test that we can read them, should work fine
 
@@ -392,7 +394,6 @@ public class RebuildIndexTest extends AbstractCoreIT {
 
             logger.info( "Rebuilt index" );
 
-            app.refreshIndex();
         }
         catch ( Exception ex ) {
             logger.error( "Error rebuilding index", ex );
@@ -401,7 +402,7 @@ public class RebuildIndexTest extends AbstractCoreIT {
 
         // ----------------- test that we can read them
 
-        Thread.sleep( 2000 );
+        app.waitForQueueDrainAndRefreshIndex(5000);
         results = em.searchCollectionConsistent( em.getApplicationRef(), collectionName, q, 3 );
         assertEquals(results.size(),3);
         q = Query.fromQL("select * where location within 100 of "+lat+", "+lon);
@@ -435,7 +436,7 @@ public class RebuildIndexTest extends AbstractCoreIT {
 
         final Entity secondEntity = em.create( "thing",  entityData);
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex(5000);
 
         // ----------------- test that we can read them, should work fine
 
@@ -493,7 +494,6 @@ public class RebuildIndexTest extends AbstractCoreIT {
 
             logger.info( "Rebuilt index" );
 
-            app.refreshIndex();
         }
         catch ( Exception ex ) {
             logger.error( "Error rebuilding index", ex );
@@ -502,7 +502,7 @@ public class RebuildIndexTest extends AbstractCoreIT {
 
         // ----------------- test that we can read them
 
-        Thread.sleep( 2000 );
+        app.waitForQueueDrainAndRefreshIndex(5000);
         countEntities( em, collectionName, 1 );
     }
 
@@ -547,14 +547,14 @@ public class RebuildIndexTest extends AbstractCoreIT {
         );
 
         ei.deleteApplication().toBlocking().lastOrDefault( null );
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
     }
 
 
     private int readData( EntityManager em, String collectionName, int expectedEntities, int expectedConnections )
         throws Exception {
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         Query q = Query.fromQL( "select * where key1=1000" ).withLimit( 1000 );
         Results results = em.searchCollectionConsistent( em.getApplicationRef(), collectionName, q, expectedEntities );
@@ -593,7 +593,7 @@ public class RebuildIndexTest extends AbstractCoreIT {
     private int countEntities( EntityManager em, String collectionName, int expectedEntities)
            throws Exception {
 
-           app.refreshIndex();
+           app.waitForQueueDrainAndRefreshIndex();
 
            Query q = Query.fromQL( "select * where key1=1000" ).withLimit( 1000 );
            Results results = em.searchCollectionConsistent( em.getApplicationRef(), collectionName, q, expectedEntities );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImplIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImplIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImplIT.java
index d287d7e..3652b6f 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImplIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImplIT.java
@@ -26,11 +26,8 @@ import java.util.UUID;
 import org.apache.usergrid.Application;
 import org.apache.usergrid.CoreApplication;
 import org.apache.usergrid.corepersistence.index.ReIndexRequestBuilder;
-import org.apache.usergrid.corepersistence.index.ReIndexRequestBuilderImpl;
 import org.apache.usergrid.corepersistence.index.ReIndexService;
-import org.apache.usergrid.corepersistence.index.ReIndexServiceImpl;
 import org.apache.usergrid.persistence.*;
-import org.apache.usergrid.utils.UUIDUtils;
 import org.junit.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -43,8 +40,6 @@ import org.apache.usergrid.persistence.cassandra.util.TraceTagManager;
 import org.apache.usergrid.persistence.cassandra.util.TraceTagReporter;
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 import org.apache.usergrid.setup.ConcurrentProcessSingleton;
-import rx.functions.Func0;
-import rx.functions.Func1;
 import rx.functions.Func2;
 
 import javax.annotation.concurrent.NotThreadSafe;
@@ -140,7 +135,7 @@ public class EntityManagerFactoryImplIT extends AbstractCoreIT {
             Thread.sleep( 500 );
         }
 
-        this.app.refreshIndex();
+        this.app.waitForQueueDrainAndRefreshIndex();
 
 
         // wait for it to appear in delete apps list
@@ -164,7 +159,7 @@ public class EntityManagerFactoryImplIT extends AbstractCoreIT {
         // delete the application
         setup.getEmf().deleteApplication(deletedAppId);
 
-        this.app.refreshIndex();
+        this.app.waitForQueueDrainAndRefreshIndex();
 
         found = findApps.call( deletedAppId, emf.getDeletedApplications() );
 
@@ -196,14 +191,14 @@ public class EntityManagerFactoryImplIT extends AbstractCoreIT {
             }
         }while (status.getStatus()!= ReIndexService.Status.COMPLETE);
 
-        this.app.refreshIndex();
+        this.app.waitForQueueDrainAndRefreshIndex();
 
         // test to see that app now works and is happy
 
         // it should not be found in the deleted apps collection
         found = findApps.call( deletedAppId, emf.getDeletedApplications());
         assertFalse("Restored app found in deleted apps collection", found);
-        this.app.refreshIndex();
+        this.app.waitForQueueDrainAndRefreshIndex();
 
         apps = setup.getEmf().getApplications();
         found = findApps.call(deletedAppId, apps);

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/persistence/query/ConnectionHelper.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/query/ConnectionHelper.java b/stack/core/src/test/java/org/apache/usergrid/persistence/query/ConnectionHelper.java
index e5c84f8..1f53c0a 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/query/ConnectionHelper.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/query/ConnectionHelper.java
@@ -76,7 +76,7 @@ public class ConnectionHelper extends CollectionIoHelper {
     @Override
     public Results getResults( Query query ) throws Exception {
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
         query.setConnectionType( CONNECTION );
         query.setEntityType( "test" );
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/persistence/query/IntersectionTransitivePagingIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/query/IntersectionTransitivePagingIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/query/IntersectionTransitivePagingIT.java
index cea3b35..dcc8eae 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/query/IntersectionTransitivePagingIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/query/IntersectionTransitivePagingIT.java
@@ -131,7 +131,7 @@ public class IntersectionTransitivePagingIT{
 
 
         }
-        this.app.refreshIndex();
+        this.app.waitForQueueDrainAndRefreshIndex();
 
         Thread.sleep(1000);
         return expected;

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/persistence/query/IntersectionUnionPagingIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/query/IntersectionUnionPagingIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/query/IntersectionUnionPagingIT.java
index 4d60164..3403dc8 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/query/IntersectionUnionPagingIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/query/IntersectionUnionPagingIT.java
@@ -135,7 +135,7 @@ public class IntersectionUnionPagingIT {
 
         }
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
         long stop = System.currentTimeMillis();
 
         logger.info( "Writes took {} ms", stop - start );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/persistence/query/IteratingQueryIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/query/IteratingQueryIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/query/IteratingQueryIT.java
index b2003fe..dac3f68 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/query/IteratingQueryIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/query/IteratingQueryIT.java
@@ -298,7 +298,7 @@ public class IteratingQueryIT {
             //we have to sleep, or we kill embedded cassandra
 
         }
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
         Thread.sleep(1000);
         long stop = System.currentTimeMillis();
 
@@ -367,7 +367,7 @@ public class IteratingQueryIT {
                 expected.add( name );
             }
         }
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         long stop = System.currentTimeMillis();
 
@@ -438,7 +438,7 @@ public class IteratingQueryIT {
             }
         }
 
-        this.app.refreshIndex();
+        this.app.waitForQueueDrainAndRefreshIndex();
         long stop = System.currentTimeMillis();
 
         logger.info( "Writes took {} ms", stop - start );
@@ -551,7 +551,7 @@ public class IteratingQueryIT {
                 expectedResults.add( name );
             }
         }
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         long stop = System.currentTimeMillis();
 
@@ -623,7 +623,7 @@ public class IteratingQueryIT {
             }
         }
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
         long stop = System.currentTimeMillis();
 
         logger.info( "Writes took {} ms", stop - start );
@@ -689,7 +689,7 @@ public class IteratingQueryIT {
             }
         }
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
         long stop = System.currentTimeMillis();
 
         logger.info( "Writes took {} ms", stop - start );
@@ -742,7 +742,7 @@ public class IteratingQueryIT {
             io.writeEntity( entity );
             expected.add( name );
         }
-        this.app.refreshIndex();
+        this.app.waitForQueueDrainAndRefreshIndex();
 
         long stop = System.currentTimeMillis();
 
@@ -803,7 +803,7 @@ public class IteratingQueryIT {
             io.writeEntity( entity );
             expected.add( name );
         }
-        this.app.refreshIndex();
+        this.app.waitForQueueDrainAndRefreshIndex();
 
         long stop = System.currentTimeMillis();
 
@@ -865,7 +865,7 @@ public class IteratingQueryIT {
             expected.add( name );
         }
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
         long stop = System.currentTimeMillis();
 
         logger.info( "Writes took {} ms", stop - start );
@@ -924,7 +924,7 @@ public class IteratingQueryIT {
             io.writeEntity( entity );
             expected.add( name );
         }
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         Thread.sleep(500);
         long stop = System.currentTimeMillis();
@@ -987,7 +987,7 @@ public class IteratingQueryIT {
             expected.add( name );
         }
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
         long stop = System.currentTimeMillis();
 
         logger.info( "Writes took {} ms", stop - start );
@@ -1050,7 +1050,7 @@ public class IteratingQueryIT {
             io.writeEntity( entity );
             expected.add( name );
         }
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         long stop = System.currentTimeMillis();
 
@@ -1110,7 +1110,7 @@ public class IteratingQueryIT {
             io.writeEntity( entity );
         }
 
-        this.app.refreshIndex();
+        this.app.waitForQueueDrainAndRefreshIndex();
 
         long stop = System.currentTimeMillis();
 
@@ -1216,7 +1216,7 @@ public class IteratingQueryIT {
 
         logger.info( "Writes took {} ms", stop - start );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         Query query = Query.fromQL( "select * order by boolean desc, index asc" );
         query.setLimit( queryLimit );
@@ -1322,7 +1322,7 @@ public class IteratingQueryIT {
 
         logger.info( "Writes took {} ms", stop - start );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         Query query =
             Query.fromQL( "select * where intersect = true OR intersect2 = true order by created, intersect desc" );
@@ -1384,7 +1384,7 @@ public class IteratingQueryIT {
 
             io.writeEntity( entity );
         }
-        this.app.refreshIndex();
+        this.app.waitForQueueDrainAndRefreshIndex();
 
         long stop = System.currentTimeMillis();
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/persistence/query/NotSubPropertyIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/query/NotSubPropertyIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/query/NotSubPropertyIT.java
index f7308da..3f5573f 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/query/NotSubPropertyIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/query/NotSubPropertyIT.java
@@ -132,7 +132,7 @@ public class NotSubPropertyIT {
 
         logger.info( "Writes took {} ms", stop - start );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         return expected;
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/persistence/query/ParenthesisProblemIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/query/ParenthesisProblemIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/query/ParenthesisProblemIT.java
index 60c1622..89641a8 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/query/ParenthesisProblemIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/query/ParenthesisProblemIT.java
@@ -72,7 +72,7 @@ public class ParenthesisProblemIT extends AbstractCoreIT {
             put("age",1);
         }});
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         final Results entities = em.searchCollection( em.getApplicationRef(), "cats", Query.fromQL(query));
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/resources/usergrid-custom-test.properties
----------------------------------------------------------------------
diff --git a/stack/core/src/test/resources/usergrid-custom-test.properties b/stack/core/src/test/resources/usergrid-custom-test.properties
index c544967..8f9058d 100644
--- a/stack/core/src/test/resources/usergrid-custom-test.properties
+++ b/stack/core/src/test/resources/usergrid-custom-test.properties
@@ -49,6 +49,9 @@ collection.uniquevalues.authoritative.region=us-east
 # Queueing Test Settings
 # Reduce the long polling time for the tests
 queue.long.polling.time.millis=50
+elasticsearch.worker_count=8
+elasticsearch.worker_count_utility=8
+queue.get.timeout.seconds=8
 
 # --- End: Usergrid cluster/actor system settings
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
index 061807b..778274e 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
@@ -165,7 +165,7 @@ public interface QakkaFig extends GuicyFig, Serializable {
     long getMaxShardSize();
 
     @Key(QUEUE_LONG_POLL_TIME_MILLIS)
-    @Default("5000")
+    @Default("1000")
     long getLongPollTimeMillis();
 
     /** Max time-to-live for queue message and payload data */
@@ -174,7 +174,7 @@ public interface QakkaFig extends GuicyFig, Serializable {
     int getMaxTtlSeconds();
 
     @Key(QUEUE_IN_MEMORY)
-    @Default("true")
+    @Default("false") // in memory not ready yet; leave this to false else msgs could be processed more than once
     boolean getInMemoryCache();
 
     @Key(QUEUE_IN_MEMORY_REFRESH_ASYNC)

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java
index 09bb8de..fa5ee0b 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java
@@ -59,7 +59,7 @@ public class InMemoryQueue {
         }
     }
 
-    public void add( String queueName, DatabaseQueueMessage databaseQueueMessage ) {
+    synchronized public void add( String queueName, DatabaseQueueMessage databaseQueueMessage ) {
 
         UUID newest = newestByQueueName.get( queueName );
         if ( newest == null ) {
@@ -76,7 +76,7 @@ public class InMemoryQueue {
         getQueue( queueName ).add( databaseQueueMessage );
     }
 
-    public UUID getNewest( String queueName ) {
+    synchronized public UUID getNewest( String queueName ) {
         if ( getQueue( queueName ).isEmpty() ) {
             newestByQueueName.remove( queueName );
         }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java
index ac2857f..fd4257b 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java
@@ -44,6 +44,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 
 
 @Singleton
@@ -188,7 +189,7 @@ public class QueueMessageManagerImpl implements QueueMessageManager {
                         queueMessage.setData( json );
 
                     } catch (UnsupportedEncodingException e) {
-                        logger.error("Error unencoding data for messageId=" + queueMessage.getMessageId(), e);
+                        logger.error("Error decoding data for messageId=" + queueMessage.getMessageId(), e);
                     }
                 } else {
                     try {
@@ -201,6 +202,12 @@ public class QueueMessageManagerImpl implements QueueMessageManager {
                 }
 
                 queueMessages.add( queueMessage );
+            } else if ( (System.currentTimeMillis() - dbMessage.getQueuedAt()) > TimeUnit.HOURS.toMillis(2) ) {
+                logger.warn("Queue Message does not have corresponding data after 2 hours, removing from queue - " +
+                    "queueName: {}, region: {}, queueMessageId: {}", dbMessage.getQueueName(), dbMessage.getRegion(),
+                    dbMessage.getQueueMessageId());
+                queueMessageSerialization.deleteMessage(dbMessage.getQueueName(), dbMessage.getRegion(),
+                    dbMessage.getShardId(), dbMessage.getType(), dbMessage.getQueueMessageId());
             }
         }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java
index 89c79ec..eb26b69 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java
@@ -23,6 +23,7 @@ import com.codahale.metrics.Timer;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.commons.lang3.SystemUtils;
 import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 import org.apache.usergrid.persistence.qakka.MetricsService;
@@ -177,7 +178,7 @@ public class QueueActorHelper {
             }
         }
 
-        newestFetchedUuid.put( queueName, since );
+        updateUUIDPointer(queueName, since);
 
 //        Shard currentShard = multiShardIterator.getCurrentShard();
 //        if ( currentShard != null ) {
@@ -279,7 +280,7 @@ public class QueueActorHelper {
     }
 
 
-    void queueRefresh( String queueName ) {
+    synchronized void queueRefresh( String queueName ) {
 
         Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.REFRESH_TIME).time();
 
@@ -327,7 +328,7 @@ public class QueueActorHelper {
 
                 startingShards.put( shardKey, shardId );
 
-                lastRefreshTimeMillis.put( queueName, System.currentTimeMillis() );
+                updateLastRefreshedTime(queueName);
 
                 if ( count > 0 ) {
                     Object shard = shardIdOptional.isPresent() ? shardIdOptional.get() : "null";
@@ -346,4 +347,12 @@ public class QueueActorHelper {
         return queueName + "_" + type + region;
     }
 
+    private synchronized void updateUUIDPointer(String queueName, UUID newUUIDPointer){
+        newestFetchedUuid.put( queueName, newUUIDPointer );
+    }
+
+    private synchronized void updateLastRefreshedTime(String queueName){
+        lastRefreshTimeMillis.put( queueName, System.currentTimeMillis() );
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java
index 1ff8502..cbc7245 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java
@@ -133,7 +133,7 @@ public class QueueActorRouter extends UntypedActor {
                     getContext().dispatcher(),
                     getSelf() );
                 shardAllocationSchedulersByQueueName.put( queueName, scheduler );
-                logger.debug( "Created shard allocater for queue {}", queueName );
+                logger.debug( "Created shard allocator for queue {}", queueName );
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java
index 19059e6..75c1c22 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java
@@ -139,8 +139,8 @@ public class ShardAllocator extends UntypedActor {
                 shardSerialization.createShard( newShard );
                 shardCounterSerialization.incrementCounter( queueName, type, newShard.getShardId(), 0 );
 
-                logger.info("{} Created new shard for queue {} shardId {} timestamp {} counterValue {}",
-                        this.hashCode(), queueName, shard.getShardId(), futureUUID.timestamp(), counterValue );
+                logger.info("Allocated new shard for queue, newShardID: {}, queueName: {}, shardMessageCount: {}, usedPercent: {}%",
+                    newShard.getShardId(), queueName, counterValue, (long)((double)counterValue/(double)qakkaFig.getMaxShardSize()*100) );
 
             } else {
 //                logger.debug("No new shard for queue {} counterValue {} of max {}",


[2/3] usergrid git commit: Switch DISTRIBUTED database queueing to default not cache in memory as the in memory impl causes duplicate messgae processing quite often at the moment.

Posted by mr...@apache.org.
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
index 98e055a..b1f72aa 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
@@ -20,6 +20,8 @@
 package org.apache.usergrid.persistence.qakka.distributed.impl;
 
 import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.dispatch.OnFailure;
 import akka.pattern.Patterns;
 import akka.util.Timeout;
 import com.codahale.metrics.*;
@@ -42,7 +44,9 @@ import org.apache.usergrid.persistence.qakka.serialization.queuemessages.Databas
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.Await;
+import scala.concurrent.ExecutionContext;
 import scala.concurrent.Future;
+import scala.concurrent.Promise;
 
 import java.lang.reflect.Method;
 import java.util.*;
@@ -235,20 +239,21 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
     public Collection<DatabaseQueueMessage> getNextMessagesInternal( String queueName, int count ) {
 
         if ( actorSystemManager.getClientActor() == null || !actorSystemManager.isReady() ) {
-            logger.error("Akka Actor System is not ready yet for requests.");
-            return Collections.EMPTY_LIST;
+            logger.warn("Akka Actor System is not ready yet for requests.");
+            return Collections.emptyList();
         }
 
         int maxRetries = qakkaFig.getMaxGetRetries();
         int tries = 0;
 
+        boolean interrupted = false;
+
         QueueGetRequest request = new QueueGetRequest( queueName, count );
         while ( ++tries < maxRetries ) {
             try {
                 Timeout t = new Timeout( qakkaFig.getGetTimeoutSeconds(), TimeUnit.SECONDS );
 
                 // ask ClientActor and wait (up to timeout) for response
-
                 Future<Object> fut = Patterns.ask( actorSystemManager.getClientActor(), request, t );
                 Object responseObject = Await.result( fut, t.duration() );
 
@@ -259,8 +264,8 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
                     if ( response != null && response instanceof QueueGetResponse) {
                         QueueGetResponse qprm = (QueueGetResponse)response;
                         if ( qprm.isSuccess() ) {
-                            if (tries > 1) {
-                                logger.warn( "getNextMessage {} SUCCESS after {} tries", queueName, tries );
+                            if (tries > 1 && !interrupted) {
+                                logger.warn( "getNextMessage for queue {} SUCCESS after {} tries", queueName, tries );
                             }
                         }
                         return qprm.getQueueMessages();
@@ -284,10 +289,13 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
                 }
 
             } catch ( TimeoutException e ) {
-                logger.trace("TIMEOUT popping to queue " + queueName + " retrying " + tries, e );
-
-            } catch ( Exception e ) {
-                logger.debug("ERROR popping to queue " + queueName + " retrying " + tries, e );
+                logger.warn("TIMEOUT popping queue " + queueName + ", attempt:  " + tries, e );
+            } catch(InterruptedException e){
+                interrupted = true;
+                // this might happen, retry the ask again
+                logger.trace("Thread was marked interrupted so unable to wait for the result, attempt: {}", tries);
+            }catch ( Exception e ) {
+                logger.error("ERROR popping queue " + queueName + ", attempt: " + tries, e );
             }
         }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java
index 11f3d08..4745cb1 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java
@@ -181,6 +181,9 @@ public class ShardAllocatorTest extends AbstractAkkaTest {
 
         distributedQueueService.refresh();
 
+        // the shard allocator kicks in when messages are first received
+        distributedQueueService.getNextMessages(queueName,10);
+
         try {
 
             // Create number of messages

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/corepersistence/queue/src/test/resources/qakka.properties
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/resources/qakka.properties b/stack/corepersistence/queue/src/test/resources/qakka.properties
index 94bfeff..d77e7e8 100644
--- a/stack/corepersistence/queue/src/test/resources/qakka.properties
+++ b/stack/corepersistence/queue/src/test/resources/qakka.properties
@@ -36,7 +36,8 @@ usergrid.cluster.seeds=us-east:localhost
 # Port used for cluster communications.
 usergrid.cluster.port=3545
 
-queue.inmemory.cache=true
+# In-Memory Queueing Not Ready Yet; Leave this to false else, messages are potentially processed more than once
+queue.inmemory.cache=false
 
 queue.num.actors=50
 queue.sender.num.actors=100
@@ -47,7 +48,7 @@ queue.get.timeout.seconds=5
 
 # set shard size and times low for testing purposes
 queue.shard.max.size=10
-queue.shard.allocation.check.frequency.millis=1000
+queue.shard.allocation.check.frequency.millis=500
 queue.shard.allocation.advance.time.millis=200
 
 # set low for testing purposes

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/CollectionMetadataIT.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/CollectionMetadataIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/CollectionMetadataIT.java
index 4e58935..a96d725 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/CollectionMetadataIT.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/CollectionMetadataIT.java
@@ -63,7 +63,7 @@ public class CollectionMetadataIT extends AbstractRestIT {
         e3 = this.app().collection(collectionName).post(e3);
         assertNotNull(e3);
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         // create connections
         // e1 hates e3
@@ -73,7 +73,7 @@ public class CollectionMetadataIT extends AbstractRestIT {
         // e3 has one in (hates) connection
         this.app().collection(collectionName).entity(e1).connection("hates").entity(e3).post();
         this.app().collection(collectionName).entity(e2).connection("likes").entity(e1).post();
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         // no query param, "all", and invalid param all the same
         checkMetadata(e1, null, "hates", "likes");

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/NotificationsIT.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/NotificationsIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/NotificationsIT.java
index fa68350..922c678 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/NotificationsIT.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/NotificationsIT.java
@@ -18,17 +18,15 @@ package org.apache.usergrid.rest;
 import com.codahale.metrics.Meter;
 import com.codahale.metrics.MetricRegistry;
 import com.codahale.metrics.Slf4jReporter;
-import com.fasterxml.jackson.databind.JsonNode;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
-import javax.ws.rs.core.MediaType;
+
 import org.apache.commons.lang3.time.StopWatch;
-import org.apache.usergrid.rest.test.resource.*;
-import org.apache.usergrid.rest.test.resource.endpoints.NamedResource;
 import org.apache.usergrid.rest.test.resource.model.*;
 import org.apache.usergrid.rest.test.resource.model.ApiResponse;
 import org.junit.After;
@@ -85,7 +83,7 @@ public class NotificationsIT extends org.apache.usergrid.rest.test.resource.Abst
 
         String unIndexedCollectionName = "notifications";
         app().collection( unIndexedCollectionName ).collection( "_settings" ).post( payload );
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         // create notifier
         Entity notifier = new Entity().chainPut("name", "mynotifier").chainPut("provider", "noop");
@@ -103,7 +101,7 @@ public class NotificationsIT extends org.apache.usergrid.rest.test.resource.Abst
         Token token = this.app().token().post(new Token("ed", "sesame"));
         this.clientSetup.getRestClient().token().setToken(token);
 
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         // create devices
         int devicesCount = 0;
@@ -129,7 +127,7 @@ public class NotificationsIT extends org.apache.usergrid.rest.test.resource.Abst
             devicesCount++;
         }
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         String postMeterName = getClass().getSimpleName() + ".postNotifications";
         Meter postMeter = registry.meter( postMeterName );
@@ -168,7 +166,7 @@ public class NotificationsIT extends org.apache.usergrid.rest.test.resource.Abst
         }
         registry.remove( postMeterName );
 
-        refreshIndex( );
+        waitForQueueDrainAndRefreshIndex( );
 
         logger.info("Waiting for all notifications to be sent");
         StopWatch sw = new StopWatch();

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/PartialUpdateTest.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/PartialUpdateTest.java b/stack/rest/src/test/java/org/apache/usergrid/rest/PartialUpdateTest.java
index 1067365..9b295f0 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/PartialUpdateTest.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/PartialUpdateTest.java
@@ -61,7 +61,7 @@ public class PartialUpdateTest extends AbstractRestIT {
         String uuid = userNode.get("uuid").toString();
         assertNotNull(uuid);
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         Map<String, Object> updateProperties = new LinkedHashMap<String, Object>();
         // update user bart passing only an update to a property
@@ -81,7 +81,7 @@ public class PartialUpdateTest extends AbstractRestIT {
                 fail("Update failed due to: " + uie.getResponse().readEntity(String.class));
             }
 
-            refreshIndex();
+            waitForQueueDrainAndRefreshIndex();
             // retrieve the user from the backend
             userNode = this.app().collection("users").entity(userNode).get();
 
@@ -123,7 +123,7 @@ public class PartialUpdateTest extends AbstractRestIT {
         } catch (ClientErrorException uie) {
             fail("Update failed due to: " + uie.getResponse().readEntity(String.class));
         }
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         userNode = this.app().collection("users").entity(userNode).get();
         assertNotNull(userNode);

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/SystemResourceIT.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/SystemResourceIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/SystemResourceIT.java
index dd2a733..383c046 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/SystemResourceIT.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/SystemResourceIT.java
@@ -24,7 +24,6 @@ import org.junit.Test;
 
 import java.util.LinkedHashMap;
 import java.util.Map;
-import java.util.UUID;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -57,7 +56,7 @@ public class SystemResourceIT extends AbstractRestIT {
         for(int i =0; i<count;i++) {
             this.app().collection("tests").post(new Entity().chainPut("testval", "test"));
         }
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
         QueryParameters queryParameters = new QueryParameters();
         queryParameters.addParam( "access_token", clientSetup.getSuperuserToken().getAccessToken() );
         queryParameters.addParam("confirmApplicationName", this.clientSetup.getAppName());

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/applications/ApplicationCreateIT.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/ApplicationCreateIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/ApplicationCreateIT.java
index 0f9be30..a6d987b 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/ApplicationCreateIT.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/ApplicationCreateIT.java
@@ -104,7 +104,7 @@ public class ApplicationCreateIT extends AbstractRestIT {
             .management().orgs().org( orgName ).app().post( new Application( appName ) );
         UUID appId = appCreateResponse.getEntities().get(0).getUuid();
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
         for ( int i=0; i<5; i++ ) {
 
             final String entityName = "entity" + i;

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/applications/ApplicationDeleteIT.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/ApplicationDeleteIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/ApplicationDeleteIT.java
index 6416cff..6521444 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/ApplicationDeleteIT.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/ApplicationDeleteIT.java
@@ -213,7 +213,7 @@ public class ApplicationDeleteIT extends AbstractRestIT {
         // test that we cannot see the application in the list of applications returned
         // by the management resource's get organization's applications end-point
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         ManagementResponse orgAppResponse = clientSetup.getRestClient()
             .management().orgs().org( orgName ).apps().getOrganizationApplications();
@@ -295,7 +295,7 @@ public class ApplicationDeleteIT extends AbstractRestIT {
             .request()
             .delete();
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
 
         // restore the app
@@ -308,7 +308,7 @@ public class ApplicationDeleteIT extends AbstractRestIT {
             .request()
             .put( javax.ws.rs.client.Entity.entity( "", MediaType.APPLICATION_JSON )); // must send body
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         // test that we can see the application in the list of applications
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/applications/ApplicationResourceIT.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/ApplicationResourceIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/ApplicationResourceIT.java
index 9f4f8aa..8dabf93 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/ApplicationResourceIT.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/ApplicationResourceIT.java
@@ -181,7 +181,7 @@ public class ApplicationResourceIT extends AbstractRestIT {
             }
             logger.info( "Waiting for app to become available" );
             Thread.sleep(500);
-            refreshIndex();
+            waitForQueueDrainAndRefreshIndex();
         }
         assertNotNull( clientId );
         assertNotNull( clientSecret );
@@ -242,7 +242,7 @@ public class ApplicationResourceIT extends AbstractRestIT {
 
         assertNotNull(entity);
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         //retrieve the app using a username and password
         QueryParameters params = new QueryParameters()
@@ -354,7 +354,7 @@ public class ApplicationResourceIT extends AbstractRestIT {
         Entity entity = this.app().collection("users").post(user);
         //assert that it was saved correctly
         assertNotNull(entity);
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         //add a ttl to the entity that is greater than the maximum
         entity.chainPut("grant_type", "password").chainPut("ttl", Long.MAX_VALUE);
@@ -392,7 +392,7 @@ public class ApplicationResourceIT extends AbstractRestIT {
         //save the entity
         Entity entity = this.app().collection("users").post(user);
         assertNotNull(entity);
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         //Retrieve an authentication token for the user, setting the TTL
         Token apiResponse = target().path( String.format( "/%s/%s/token", orgName, appName ) )
@@ -457,7 +457,7 @@ public class ApplicationResourceIT extends AbstractRestIT {
         //save the entity
         Entity entity = this.app().collection("users").post(user);
         assertNotNull(entity);
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         try {
             //Retrieve a token for the new user, setting the TTL to an invalid value
@@ -496,7 +496,7 @@ public class ApplicationResourceIT extends AbstractRestIT {
         //save the entity
         Entity entity = this.app().collection("users").post(user);
         assertNotNull(entity);
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
         //Retrieve an authentication token for the user
         Token tokenResponse = this.app().getTarget( false ).path( "token" )
             .queryParam( "grant_type", "password" )

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/applications/assets/AssetResourceIT.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/assets/AssetResourceIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/assets/AssetResourceIT.java
index 144893d..616d929 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/assets/AssetResourceIT.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/assets/AssetResourceIT.java
@@ -72,7 +72,7 @@ public class AssetResourceIT extends AbstractRestIT {
     @Test
     public void octetStreamOnDynamicEntity() throws Exception {
 
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         //  post an asset entity
 
@@ -113,7 +113,7 @@ public class AssetResourceIT extends AbstractRestIT {
     @Test
     public void verifyMetadataChanged() throws Exception {
 
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         // post an entity
 
@@ -130,7 +130,7 @@ public class AssetResourceIT extends AbstractRestIT {
             .field( "name", "verifyMetadataChangedTest" )
             .field( "file", data, MediaType.MULTIPART_FORM_DATA_TYPE );
         ApiResponse putResponse = pathResource( getOrgAppPath( "foos/" + assetId ) ).put( form );
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         // get entity and check asset metadata
 
@@ -175,7 +175,7 @@ public class AssetResourceIT extends AbstractRestIT {
             .field( "name", "verifyMetadataChangedTest" )
             .field( "file", data, MediaType.MULTIPART_FORM_DATA_TYPE );
         putResponse = pathResource( getOrgAppPath( "foos/" + assetId ) ).put( form );
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         //verify that data was correctly written to backend
         getResponse = pathResource( getOrgAppPath( "foos/" + assetId ) ).get( ApiResponse.class );
@@ -193,14 +193,14 @@ public class AssetResourceIT extends AbstractRestIT {
     @Test
     public void multipartPostFormOnDynamicEntity() throws Exception {
 
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         // post data larger than 5M
 
         byte[] data = IOUtils.toByteArray( this.getClass().getResourceAsStream( "/file-bigger-than-5M" ) );
         FormDataMultiPart form = new FormDataMultiPart().field( "file", data, MediaType.MULTIPART_FORM_DATA_TYPE );
         ApiResponse putResponse = pathResource(getOrgAppPath("foos")).post(form);
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         UUID assetId = putResponse.getEntities().get(0).getUuid();
         assertNotNull(assetId);
@@ -234,7 +234,7 @@ public class AssetResourceIT extends AbstractRestIT {
     @Test
     public void multipartPutFormOnDynamicEntity() throws Exception {
 
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         // post an entity
 
@@ -250,7 +250,7 @@ public class AssetResourceIT extends AbstractRestIT {
             .field( "foo", "bar2" )
             .field( "file", data, MediaType.MULTIPART_FORM_DATA_TYPE );
         ApiResponse putResponse = pathResource( getOrgAppPath( "foos/" + assetId ) ).put( form );
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         // get entity and check asset metadata
 
@@ -283,7 +283,7 @@ public class AssetResourceIT extends AbstractRestIT {
     @Test
     public void largeFileInS3() throws Exception {
 
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         // upload file larger than 5MB
 
@@ -310,7 +310,7 @@ public class AssetResourceIT extends AbstractRestIT {
     @Test
     public void fileTooLargeShouldResultInError() throws Exception {
 
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         // set max file size down to 6mb
 
@@ -354,7 +354,7 @@ public class AssetResourceIT extends AbstractRestIT {
     @Test
     public void deleteConnectionToAsset() throws IOException {
 
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         // create the entity that will be the asset, an image
 
@@ -378,7 +378,7 @@ public class AssetResourceIT extends AbstractRestIT {
 
         ApiResponse connectResponse = pathResource(
             getOrgAppPath( "imagegalleries/" + imageGalleryId + "/contains/" + uuid ) ).post( ApiResponse.class );
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         // verify connection from imagegallery to asset
 
@@ -389,7 +389,7 @@ public class AssetResourceIT extends AbstractRestIT {
         // delete the connection
 
         pathResource( getOrgAppPath( "imagegalleries/" + imageGalleryId + "/contains/" + uuid ) ).delete();
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         // verify that connection is gone
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/applications/assets/AwsAssetResourceIT.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/assets/AwsAssetResourceIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/assets/AwsAssetResourceIT.java
index ad12975..4a9bfaa 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/assets/AwsAssetResourceIT.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/assets/AwsAssetResourceIT.java
@@ -205,7 +205,7 @@ public class AwsAssetResourceIT extends AbstractRestIT {
     @Test
     public void octetStreamOnDynamicEntity() throws Exception {
 
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         //  post an asset entity
 
@@ -246,14 +246,14 @@ public class AwsAssetResourceIT extends AbstractRestIT {
     @Test
     public void multipartPostFormOnDynamicEntity() throws Exception {
 
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         // post data larger than 5M
 
         byte[] data = IOUtils.toByteArray( this.getClass().getResourceAsStream( "/file-bigger-than-5M" ) );
         FormDataMultiPart form = new FormDataMultiPart().field( "file", data, MediaType.MULTIPART_FORM_DATA_TYPE );
         ApiResponse putResponse = pathResource(getOrgAppPath("foos")).post(form);
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         UUID assetId = putResponse.getEntities().get(0).getUuid();
         assertNotNull(assetId);
@@ -287,7 +287,7 @@ public class AwsAssetResourceIT extends AbstractRestIT {
     @Test
     public void multipartPutFormOnDynamicEntity() throws Exception {
 
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         // post an entity
 
@@ -303,7 +303,7 @@ public class AwsAssetResourceIT extends AbstractRestIT {
             .field( "foo", "bar2" )
             .field( "file", data, MediaType.MULTIPART_FORM_DATA_TYPE );
         ApiResponse putResponse = pathResource( getOrgAppPath( "foos/" + assetId ) ).put( form );
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         // get entity and check asset metadata
 
@@ -336,7 +336,7 @@ public class AwsAssetResourceIT extends AbstractRestIT {
     @Test
     public void largeFileInS3() throws Exception {
 
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         // upload file larger than 5MB
 
@@ -363,7 +363,7 @@ public class AwsAssetResourceIT extends AbstractRestIT {
     @Test
     public void fileTooLargeShouldResultInError() throws Exception {
 
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         // set max file size down to 6mb
         setTestProperty( "usergrid.binary.max-size-mb","6" );
@@ -383,7 +383,7 @@ public class AwsAssetResourceIT extends AbstractRestIT {
 
             // attempt to get asset entity, it should contain error
 
-            refreshIndex();
+            waitForQueueDrainAndRefreshIndex();
             ApiResponse getResponse = pathResource( getOrgAppPath( "bars/" +assetId ) ).get( ApiResponse.class );
             Map<String, Object> fileMetadata = (Map<String, Object>)getResponse.getEntities().get(0).get("file-metadata");
             assertNotNull( fileMetadata );
@@ -403,7 +403,7 @@ public class AwsAssetResourceIT extends AbstractRestIT {
     @Test
     public void deleteConnectionToAsset() throws IOException {
 
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         // create the entity that will be the asset, an image
 
@@ -427,7 +427,7 @@ public class AwsAssetResourceIT extends AbstractRestIT {
 
         ApiResponse connectResponse = pathResource(
             getOrgAppPath( "imagegalleries/" + imageGalleryId + "/contains/" + uuid ) ).post( ApiResponse.class );
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         // verify connection from imagegallery to asset
 
@@ -438,7 +438,7 @@ public class AwsAssetResourceIT extends AbstractRestIT {
         // delete the connection
 
         pathResource( getOrgAppPath( "imagegalleries/" + imageGalleryId + "/contains/" + uuid ) ).delete();
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         // verify that connection is gone
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/BrowserCompatibilityTest.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/BrowserCompatibilityTest.java b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/BrowserCompatibilityTest.java
index b453ed2..b63400a 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/BrowserCompatibilityTest.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/BrowserCompatibilityTest.java
@@ -69,7 +69,7 @@ public class BrowserCompatibilityTest extends org.apache.usergrid.rest.test.reso
         Entity entity = this.app().collection("things").post(payload);
         assertEquals(entity.get("name"), name);
         String uuid = entity.getAsString("uuid");
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         //now get this new entity with "text/html" in the accept header
         Entity returnedEntity = this.app().collection("things").withAcceptHeader(acceptHeader).entity(entity).get();

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/CollectionsResourceIT.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/CollectionsResourceIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/CollectionsResourceIT.java
index d72054a..bf06c21 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/CollectionsResourceIT.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/CollectionsResourceIT.java
@@ -136,7 +136,7 @@ public class CollectionsResourceIT extends AbstractRestIT {
             fail("This should return a success.");
         }
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
 
         Collection collection = this.app().collection( "testCollections" ).collection( "_settings" ).get();
@@ -159,7 +159,7 @@ public class CollectionsResourceIT extends AbstractRestIT {
 
         //Post index to the collection metadata
         Entity thing = this.app().collection( "testCollections" ).collection( "_settings" ).post( payload );
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
 
         //The above verifies the test case.
@@ -172,7 +172,7 @@ public class CollectionsResourceIT extends AbstractRestIT {
 
         //Post entity.
         Entity postedEntity = this.app().collection( "testCollections" ).post( testEntity );
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         //Do a query to see if you can find the indexed query.
         String query = "two ='query'";
@@ -198,11 +198,11 @@ public class CollectionsResourceIT extends AbstractRestIT {
 
         //next part is to delete the schema then reindex it and it should work.
         this.app().collection( "testCollections" ).collection( "_settings" ).delete();
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         this.app().collection( "testCollections" ).collection( "_reindex" )
             .post(true,clientSetup.getSuperuserToken(),ApiResponse.class,null,null,false);
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
 
         //Do a query to see if you can find the indexed query.
@@ -233,14 +233,14 @@ public class CollectionsResourceIT extends AbstractRestIT {
         Entity payload = new Entity();
         payload.put( "fields", "all");
         app().collection( "testCollection" ).collection( "_settings" ).post( payload );
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         // post entity with two fields
         Entity testEntity = new Entity();
         testEntity.put( "one", "helper" );
         testEntity.put( "two","query" );
         app().collection( "testCollection" ).post( testEntity );
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         // verify it can be queried on both fields
 
@@ -288,7 +288,7 @@ public class CollectionsResourceIT extends AbstractRestIT {
 
         //Post index to the collection metadata
         Entity thing = this.app().collection( "testCollection" ).collection( "_settings" ).post( payload );
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
 
         //Reindex and verify that the entity only has field one index.
@@ -339,7 +339,7 @@ public class CollectionsResourceIT extends AbstractRestIT {
 
         //Post index to the collection metadata
         Entity thing = this.app().collection( "testCollection" ).collection( "_settings" ).post( payload );
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         Collection collection = this.app().collection( "testCollection" ).collection( "_settings" ).get();
 
@@ -419,7 +419,7 @@ public class CollectionsResourceIT extends AbstractRestIT {
 
         //Post index to the collection metadata
         this.app().collection( "testCollection" ).collection( "_settings" ).post( payload );
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         //Create test collection with a test entity that is partially indexed.
         Entity testEntity = new Entity();
@@ -428,7 +428,7 @@ public class CollectionsResourceIT extends AbstractRestIT {
 
         //Post entity.
         this.app().collection( "testCollection" ).post( testEntity );
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         //Do a query to see if you can find the indexed query.
         String query = "two ='query'";
@@ -461,7 +461,7 @@ public class CollectionsResourceIT extends AbstractRestIT {
 
         //Post index to the collection metadata
         this.app().collection( "testCollection" ).collection( "_settings" ).post( payload );
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         Map<String,Object> arrayFieldsForTesting = new HashMap<>();
 
@@ -475,7 +475,7 @@ public class CollectionsResourceIT extends AbstractRestIT {
 
         //Post entity.
         this.app().collection( "testCollection" ).post( testEntity );
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         //Do a query to see if you can find the indexed query.
         String query = "one.key = 'value'";
@@ -511,7 +511,7 @@ public class CollectionsResourceIT extends AbstractRestIT {
 
         //Post index to the collection metadata
         this.app().collection( "testCollection" ).collection( "_settings" ).post( payload );
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         Map<String,Object> arrayFieldsForTesting = new HashMap<>();
 
@@ -525,7 +525,7 @@ public class CollectionsResourceIT extends AbstractRestIT {
 
         //Post entity.
         this.app().collection( "testCollection" ).post( testEntity );
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         //Do a query to see if you can find the indexed query.
         String query = "one.key = 'value'";
@@ -554,7 +554,7 @@ public class CollectionsResourceIT extends AbstractRestIT {
 
         //Post index to the collection metadata
         this.app().collection( "testCollection" ).collection( "_settings" ).post( payload );
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         Map<String,Object> arrayFieldsForTestingSelectiveIndexing = new HashMap<>();
 
@@ -573,7 +573,7 @@ public class CollectionsResourceIT extends AbstractRestIT {
 
         //Post entity.
         this.app().collection( "testCollection" ).post( testEntity );
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         //Do a query to see if you can find the indexed query.
         String query = "one.key.wowMoreKeys = 'value'";
@@ -609,7 +609,7 @@ public class CollectionsResourceIT extends AbstractRestIT {
 
         //Post index to the collection metadata
         this.app().collection( "testCollection" ).collection( "_settings" ).post( payload );
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         Map<String,Object> arrayFieldsForTestingSelectiveIndexing = new HashMap<>();
 
@@ -629,7 +629,7 @@ public class CollectionsResourceIT extends AbstractRestIT {
 
         //Post entity.
         this.app().collection( "testCollection" ).post( testEntity );
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         //Do a query to see if you can find the indexed query.
         String query = "name = 'howdy'";
@@ -660,7 +660,7 @@ public class CollectionsResourceIT extends AbstractRestIT {
 
         //Post index to the collection metadata
         this.app().collection( "testCollection" ).collection( "_settings" ).post( payload );
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         //Create test collection with a test entity that is partially indexed.
         Entity testEntity = new Entity();
@@ -669,11 +669,11 @@ public class CollectionsResourceIT extends AbstractRestIT {
 
         //Post entity.
         Entity postedEntity = this.app().collection( "testCollection" ).post( testEntity );
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         testEntity.put( "one","three" );
         this.app().collection( "testCollection" ).entity( postedEntity.getUuid() ).put( testEntity );
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         //Do a query to see if you can find the indexed query.
         String query = "one = 'three'";
@@ -715,7 +715,7 @@ public class CollectionsResourceIT extends AbstractRestIT {
         Entity user = this.app().collection("users").post(payload);
         assertEquals(user.get("username"), username);
         assertEquals(user.get("email"), email);
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         String collectionName = "nestprofiles";
         //create a permission with the path "me" in it
@@ -743,7 +743,7 @@ public class CollectionsResourceIT extends AbstractRestIT {
         Entity nestProfile = this.app().collection(collectionName).post(payload);
         assertEquals(nestProfile.get("name"), profileName);
 
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         Entity nestprofileReturned = this.app().collection(collectionName).entity(nestProfile).get();
         assertEquals(nestprofileReturned.get("name"), profileName);
@@ -766,7 +766,7 @@ public class CollectionsResourceIT extends AbstractRestIT {
         assertEquals( calendarlistOne.get( "summaryOverview" ), summaryOverview );
         assertEquals(calendarlistOne.get("caltype"), calType);
 
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         //post a second entity
         payload = new Entity();
@@ -819,9 +819,9 @@ public class CollectionsResourceIT extends AbstractRestIT {
         assertNotSame( null,
             ((LinkedHashMap)(collectionHashMap.get( "collections" ))).get( collectionName.toLowerCase() ));
 
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
         this.app().collection( collectionName ).entity( testEntity.getEntity().getUuid() ).delete();
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
 
         //Verify that the collection still exists despite deleting its only entity.)
@@ -850,7 +850,7 @@ public class CollectionsResourceIT extends AbstractRestIT {
         payload.put("name", name);
         Entity user = this.app().collection("app_users").post(payload);
         assertEquals(user.get("name"), name);
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         Entity user2 = this.app().collection("app_users").entity(user).get();
 
@@ -880,7 +880,7 @@ public class CollectionsResourceIT extends AbstractRestIT {
         String randomizer = RandomStringUtils.randomAlphanumeric(10);
         String collectionName = "col_" + randomizer;
         app().collection( collectionName ).collection( "_settings" ).post( payload );
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         // was the no-index wildcard saved and others ignored?
         Collection collection = app().collection( collectionName ).collection( "_settings" ).get();
@@ -923,7 +923,7 @@ public class CollectionsResourceIT extends AbstractRestIT {
         String randomizer = RandomStringUtils.randomAlphanumeric(10);
         String unIndexedCollectionName = "col_" + randomizer;
         app().collection( unIndexedCollectionName ).collection( "_settings" ).post( payload );
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         String entityName1 = "unindexed1";
         Entity unindexed1 = this.app().collection( unIndexedCollectionName )
@@ -982,7 +982,7 @@ public class CollectionsResourceIT extends AbstractRestIT {
 
         String unIndexedCollectionName = "col_" + randomizer;
         app().collection( unIndexedCollectionName ).collection( "_settings" ).post( payload );
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         String entityName1 = "unindexed1";
         Entity unindexed1 = this.app().collection( unIndexedCollectionName )
@@ -1018,7 +1018,7 @@ public class CollectionsResourceIT extends AbstractRestIT {
 
         app().collection( collectionName ).collection( "_settings" )
             .post( new Entity().chainPut( "fields", "all" ) );
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         // get collection settings, should see no region
 
@@ -1051,7 +1051,7 @@ public class CollectionsResourceIT extends AbstractRestIT {
 
         app().collection( collectionName ).collection( "_settings" )
             .post( new Entity().chainPut( REGION_SETTING, "" ) );
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         // get collection settings, should see no region
 
@@ -1091,14 +1091,14 @@ public class CollectionsResourceIT extends AbstractRestIT {
 
 
         this.app().collection("notifications/"+ UUIDUtils.newTimeUUID()).post(payload );
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         Collection user2 = this.app().collection("notifications").get();
 
         assertEquals(1,user2.getNumOfEntities());
 
         this.app().collection("notifications/"+ UUIDUtils.newTimeUUID()).put(null,payload );
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         user2 = this.app().collection("notifications").get();
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/DuplicateNameIT.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/DuplicateNameIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/DuplicateNameIT.java
index 0776705..7e1c5a5 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/DuplicateNameIT.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/DuplicateNameIT.java
@@ -41,7 +41,7 @@ public class DuplicateNameIT extends AbstractRestIT {
         entity.put("name", "enzo");
         //Create an entity named "enzo" in the "things" collection
         entity = this.app().collection(collectionName).post(entity);
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
         try {
             // Try to create a second entity in "things" with the name "enzo".
             this.app().collection(collectionName).post(entity);

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/activities/ActivityResourceIT.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/activities/ActivityResourceIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/activities/ActivityResourceIT.java
index c7f39b2..a9f5fee 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/activities/ActivityResourceIT.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/activities/ActivityResourceIT.java
@@ -59,7 +59,7 @@ public class ActivityResourceIT extends AbstractRestIT {
         this.activityDesc = "testActivity" ;
         this.activity = new ActivityEntity().putActor(current).chainPut("title", activityTitle).chainPut("content", activityDesc).chainPut("category", "testCategory").chainPut("verb", "POST");
         this.groupActivityResource = groupsResource.entity(entity).activities();
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
     }
 
 
@@ -87,7 +87,7 @@ public class ActivityResourceIT extends AbstractRestIT {
         {
             throw e;
         }
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         Collection results = groupActivityResource.get();
 
@@ -111,7 +111,7 @@ public class ActivityResourceIT extends AbstractRestIT {
         usersResource.entity(current).activities().post(activity);
 
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         Collection results = usersResource.entity(current).activities().get();
 
@@ -136,7 +136,7 @@ public class ActivityResourceIT extends AbstractRestIT {
 
         this.app().collection("activities").post(activity);
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         Collection results = this.app().collection("activities").get();
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/activities/PutTest.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/activities/PutTest.java b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/activities/PutTest.java
index d61d363..4ba8977 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/activities/PutTest.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/activities/PutTest.java
@@ -17,7 +17,6 @@
 package org.apache.usergrid.rest.applications.collection.activities;
 
 
-import com.fasterxml.jackson.databind.JsonNode;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
@@ -28,7 +27,6 @@ import org.apache.usergrid.rest.test.resource.AbstractRestIT;
 import org.apache.usergrid.rest.test.resource.endpoints.CollectionEndpoint;
 import org.apache.usergrid.rest.test.resource.model.Collection;
 import org.apache.usergrid.rest.test.resource.model.Entity;
-import org.junit.Rule;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -60,7 +58,7 @@ public class PutTest extends AbstractRestIT {
             Entity activity = activities.post(new Entity(props));
         }
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         String query = "select * ";
 
@@ -72,7 +70,7 @@ public class PutTest extends AbstractRestIT {
         props.put( "actor", newActor );
         Entity activity = activities.post(new Entity(props));
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         collection = activities.get(  );
         assertEquals( 6, collection.getResponse().getEntities().size() );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/devices/DevicesResourceIT.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/devices/DevicesResourceIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/devices/DevicesResourceIT.java
index 67cf19f..b73bcbd 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/devices/DevicesResourceIT.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/devices/DevicesResourceIT.java
@@ -17,11 +17,8 @@
 package org.apache.usergrid.rest.applications.collection.devices;
 
 
-import java.util.HashMap;
-import java.util.Map;
 import java.util.UUID;
 
-import com.fasterxml.jackson.databind.JsonNode;
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 import org.apache.usergrid.rest.test.resource.AbstractRestIT;
 import org.apache.usergrid.rest.test.resource.endpoints.CollectionEndpoint;
@@ -35,7 +32,6 @@ import java.io.IOException;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.fail;
-import org.junit.Ignore;
 
 import javax.ws.rs.ClientErrorException;
 
@@ -51,7 +47,7 @@ public class DevicesResourceIT extends AbstractRestIT {
 
         CollectionEndpoint devicesResource  =this.app().collection("devices");
         Entity entity = devicesResource.entity(uuid).put(payload);
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         // create
         assertNotNull( entity );
@@ -62,7 +58,7 @@ public class DevicesResourceIT extends AbstractRestIT {
         ApiResponse deleteResponse =devicesResource.entity(uuid).delete();
         assertNotNull(deleteResponse.getEntities().get(0));
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         // check deleted
         try {
@@ -72,7 +68,7 @@ public class DevicesResourceIT extends AbstractRestIT {
         catch ( ClientErrorException e ) {
             assertEquals( 404, e.getResponse().getStatus() );
         }
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         // create again
         entity = devicesResource.entity(uuid).put(payload);
@@ -80,7 +76,7 @@ public class DevicesResourceIT extends AbstractRestIT {
 
         assertNotNull( entity );
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         // check existence
         entity = devicesResource.entity(uuid).get();

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/groups/GroupResourceIT.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/groups/GroupResourceIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/groups/GroupResourceIT.java
index 769852d..b94050b 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/groups/GroupResourceIT.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/groups/GroupResourceIT.java
@@ -21,7 +21,6 @@ import com.fasterxml.jackson.databind.JsonNode;
 import org.apache.usergrid.rest.test.resource.AbstractRestIT;
 import org.apache.usergrid.rest.test.resource.model.Collection;
 import org.apache.usergrid.rest.test.resource.model.Entity;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import javax.ws.rs.ClientErrorException;
@@ -58,7 +57,7 @@ public class GroupResourceIT extends AbstractRestIT {
         Entity entity = this.app().collection("groups").post(payload);
         assertEquals(entity.get("name"), groupName);
         assertEquals(entity.get("path"), groupPath);
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
         return entity;
     }
 
@@ -74,7 +73,7 @@ public class GroupResourceIT extends AbstractRestIT {
         Entity entity = this.app().collection("roles").post(payload);
         assertEquals(entity.get("name"), roleName);
         assertEquals(entity.get("title"), roleTitle);
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
         return entity;
     }
 
@@ -91,7 +90,7 @@ public class GroupResourceIT extends AbstractRestIT {
         Entity entity = this.app().collection("users").post(payload);
         assertEquals(entity.get("username"), username);
         assertEquals(entity.get("email"), email);
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
         return entity;
     }
 
@@ -180,7 +179,7 @@ public class GroupResourceIT extends AbstractRestIT {
         group.put("path", newGroupPath);
         Entity groupResponse = this.app().collection("groups").entity(group).put(group);
         assertEquals(groupResponse.get("path"), newGroupPath);
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         //4. do a GET to verify the property really was set
         groupResponseGET = this.app().collection("groups").entity(group).get();
@@ -223,7 +222,7 @@ public class GroupResourceIT extends AbstractRestIT {
         // 3. add the user to the group
         Entity response = this.app().collection("users").entity(user).connection().collection("groups").entity(group).post();
         assertEquals(response.get("name"), groupName);
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         // 4. make sure the user is in the group
         Collection collection = this.app().collection("groups").entity(group).connection().collection("users").get();
@@ -237,7 +236,7 @@ public class GroupResourceIT extends AbstractRestIT {
 
         //6. remove the user from the group
         this.app().collection("group").entity(group).connection().collection("users").entity(user).delete();
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         //6. make sure the connection no longer exists
         collection = this.app().collection("group").entity(group).connection().collection("users").get();
@@ -266,12 +265,12 @@ public class GroupResourceIT extends AbstractRestIT {
         String roleName = "tester";
         String roleTitle = "tester";
         Entity role = this.createRole(roleName, roleTitle);
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         //3. add role to the group
         Entity response = this.app().collection("roles").entity(role).connection().collection("groups").entity(group).post();
         assertEquals(response.get("name"), groupName);
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         //4. make sure the role is in the group
         Collection collection = this.app().collection("groups").entity(group).connection().collection("roles").get();
@@ -280,7 +279,7 @@ public class GroupResourceIT extends AbstractRestIT {
 
         //5. remove Role from the group (should only delete the connection)
         this.app().collection("groups").entity(group).connection().collection("roles").entity(role).delete();
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         //6. make sure the connection no longer exists
         collection = this.app().collection("groups").entity(group).connection().collection("roles").get();
@@ -294,7 +293,7 @@ public class GroupResourceIT extends AbstractRestIT {
 
         //8. delete the role
         this.app().collection("role").entity(role).delete();
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
         Thread.sleep(5000);
 
         //9. do a GET to make sure the role was deleted
@@ -359,7 +358,7 @@ public class GroupResourceIT extends AbstractRestIT {
         payload.put("name", catName);
         Entity fluffy = this.app().collection("cats").post(payload);
         assertEquals(fluffy.get("name"), catName);
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         //10. get the cat - permissions should allow this
         fluffy = this.app().collection("cats").uniqueID(catName).get();
@@ -436,7 +435,7 @@ public class GroupResourceIT extends AbstractRestIT {
 
 
         //7. get all the users in the groups
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
         Collection usersInGroup = this.app().collection("groups").uniqueID(groupName).connection("users").get();
         assertEquals(usersInGroup.getResponse().getEntityCount(), 2);
 
@@ -444,7 +443,7 @@ public class GroupResourceIT extends AbstractRestIT {
         this.app().collection("role").uniqueID("Default").delete();
         Entity data = new Entity().chainPut("name", "group1role");
         this.app().collection("roles").post(data);
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         Entity perms = new Entity();
         String permission = "get,post,put,delete:/groups/" + group.getUuid() + "/**";
@@ -452,18 +451,18 @@ public class GroupResourceIT extends AbstractRestIT {
         this.app().collection("roles").uniqueID("group1role").connection("permissions").post(perms);
         this.app().collection("roles").uniqueID("group1role").connection("users").uniqueID( user1Username ).post();
         this.app().collection("roles").uniqueID("group1role").connection("users").uniqueID( user2Username ).post();
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         //7b. everybody gets access to /activities
         perms = new Entity();
         permission = "get:/activities/**";
         perms.put("permission",permission);
         this.app().collection("roles").uniqueID("Guest").connection("permissions").post(perms);
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
         this.app().collection("roles").uniqueID("Guest").connection("users").uniqueID( user1Username ).post();
         this.app().collection("roles").uniqueID("Guest").connection("users").uniqueID( user2Username ).post();
         this.app().collection("roles").uniqueID("Guest").connection("users").uniqueID( user3Username ).post();
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
 
         //8. post an activity to the group
@@ -482,7 +481,7 @@ public class GroupResourceIT extends AbstractRestIT {
         Entity activityResponse = this.app().collection("groups")
             .uniqueID(groupName).connection("activities").post(activity);
         assertEquals(activityResponse.get("content"), content);
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         //11. log user1 in, should then be using the app user's token not the admin token
         this.getAppUserToken(user1Username, password);
@@ -565,7 +564,7 @@ public class GroupResourceIT extends AbstractRestIT {
         group.put("title", newTitle);
         Entity groupResponse = this.app().collection("groups").entity(group).put(group);
         assertEquals(groupResponse.get("title"), newTitle);
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         // update that group by giving it a new title and using UUID in URL
         String evenNewerTitle = "Even New Title";
@@ -573,6 +572,6 @@ public class GroupResourceIT extends AbstractRestIT {
         String uuid = group.getAsString("uuid");
         groupResponse = this.app().collection("groups").uniqueID(uuid).put(group);
         assertEquals(groupResponse.get("title"), evenNewerTitle);
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
     }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/paging/PagingResourceIT.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/paging/PagingResourceIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/paging/PagingResourceIT.java
index 4ca46b1..da15b2e 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/paging/PagingResourceIT.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/paging/PagingResourceIT.java
@@ -130,7 +130,7 @@ public class PagingResourceIT extends AbstractRestIT {
 
             ApiResponse response = this.app().collection( collectionName ).delete( queryParameters );
 
-            this.refreshIndex();
+            this.waitForQueueDrainAndRefreshIndex();
 
             if(validate)
                 assertEquals("Entities should have been deleted", deletePageSize,response.getEntityCount() );
@@ -268,7 +268,7 @@ public class PagingResourceIT extends AbstractRestIT {
             entityPayload.put( "name", created );
             Entity entity = new Entity( entityPayload );
             entity = this.app().collection( collectionName ).post( entity );
-            refreshIndex();
+            waitForQueueDrainAndRefreshIndex();
             if(created == 1){
                 connectedEntity = entity;
             }
@@ -277,7 +277,7 @@ public class PagingResourceIT extends AbstractRestIT {
             }
         }
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         QueryParameters qp = new QueryParameters();
         qp.setQuery("select * order by created asc");
@@ -323,7 +323,7 @@ public class PagingResourceIT extends AbstractRestIT {
             this.app().collection( collectionName ).post( entity );
         }
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         //Creates query looking for entities with the very stop.
         String query = "select * where verb = 'stop'";
@@ -454,7 +454,7 @@ public class PagingResourceIT extends AbstractRestIT {
             }
         }
 
-        this.refreshIndex();
+        this.waitForQueueDrainAndRefreshIndex();
 
         return entities;
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/users/ConnectionResourceTest.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/users/ConnectionResourceTest.java b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/users/ConnectionResourceTest.java
index 6202c6a..7315cad 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/users/ConnectionResourceTest.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/users/ConnectionResourceTest.java
@@ -63,7 +63,7 @@ public class ConnectionResourceTest extends AbstractRestIT {
         Entity objectOfDesire = new Entity();
         objectOfDesire.put( "codingmunchies", "doritoes" );
         objectOfDesire = this.app().collection( "snacks" ).post( objectOfDesire );
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         Entity toddWant = this.app().collection( "users" ).entity( todd ).collection( "likes" ).collection( "snacks" )
                               .entity( objectOfDesire ).post();
@@ -93,11 +93,11 @@ public class ConnectionResourceTest extends AbstractRestIT {
         thing2.put( "name", "thing2" );
         thing2 = this.app().collection( "things" ).post( thing2 );
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
         //create the connection: thing1 likes thing2
         this.app().collection( "things" ).entity( thing1 )
             .connection("likes").collection( "things" ).entity( thing2 ).post();
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         //test we have the "likes" in our connection meta data response
         thing1 = this.app().collection( "things" ).entity( thing1 ).get();
@@ -150,14 +150,14 @@ public class ConnectionResourceTest extends AbstractRestIT {
         thing2.put( "name", "thing2" );
         thing2 = this.app().collection( "things" ).post( thing2 );
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
         //create the connection: thing1 likes thing2
         this.app().collection( "things" ).entity( thing1 )
             .connection("likes").collection( "things" ).entity( thing2 ).post();
         //delete thing2
         this.app().collection( "things" ).entity( thing2 ).delete();
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         try {
             //attempt to retrieve thing1
@@ -185,14 +185,14 @@ public class ConnectionResourceTest extends AbstractRestIT {
         thing2.put( "name", "thing2" );
         thing2 = this.app().collection( "things" ).post( thing2 );
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
         //create the connection: thing1 likes thing2
         this.app().collection( "things" ).entity( thing1 )
             .connection("likes").collection( "things" ).entity( thing2 ).post();
         //delete thing1
         this.app().collection( "things" ).entity( thing1 ).delete();
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         try {
             //attempt to retrieve thing1
@@ -236,7 +236,7 @@ public class ConnectionResourceTest extends AbstractRestIT {
         //connect thing1 -> thing3
         connectionEndpoint.entity( thing3 ).post();
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         //now do a GET, we should see thing2 then thing3
 
@@ -257,7 +257,7 @@ public class ConnectionResourceTest extends AbstractRestIT {
         //now re-post thing 2 it should appear second
         connectionEndpoint.entity( thing2 ).post();
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
 
         final ApiResponse order2 = connectionEndpoint.get().getResponse();
@@ -304,7 +304,7 @@ public class ConnectionResourceTest extends AbstractRestIT {
         //connect thing1 -> thing3
         connectionEndpoint.entity( thing3 ).put( thing3 );
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         //now do a GET, we should see thing2 then thing3
 
@@ -325,7 +325,7 @@ public class ConnectionResourceTest extends AbstractRestIT {
         //now re-post thing 2 it should appear second
         connectionEndpoint.entity( thing2 ).put( thing2 );
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         final ApiResponse order2 = connectionEndpoint.get().getResponse();
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/users/OwnershipResourceIT.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/users/OwnershipResourceIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/users/OwnershipResourceIT.java
index cfd08e5..3393582 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/users/OwnershipResourceIT.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/users/OwnershipResourceIT.java
@@ -17,19 +17,15 @@
 package org.apache.usergrid.rest.applications.collection.users;
 
 
-import com.fasterxml.jackson.databind.JsonNode;
 import java.io.IOException;
 
 import org.apache.usergrid.rest.test.resource.AbstractRestIT;
 import org.apache.usergrid.rest.test.resource.endpoints.CollectionEndpoint;
 import org.apache.usergrid.rest.test.resource.model.*;
 import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
 
 
-import org.apache.usergrid.utils.MapUtils;
-
 import javax.ws.rs.ClientErrorException;
 
 import static org.junit.Assert.assertEquals;
@@ -65,7 +61,7 @@ public class OwnershipResourceIT extends AbstractRestIT {
         user1 = new User(this.usersResource.post(user1));
         user2 = new User(this.usersResource.post(user2));
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
     }
 
 
@@ -95,7 +91,7 @@ public class OwnershipResourceIT extends AbstractRestIT {
         //Revoke the user1 token
         usersResource.entity(user1).connection("revoketokens").post(new Entity().chainPut("token", token.getAccessToken()));
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         //See if we can still access the me entity after revoking its token
         try {
@@ -127,7 +123,7 @@ public class OwnershipResourceIT extends AbstractRestIT {
         // create device 1 on user1 devices
         usersResource.entity("me").collection("devices")
                .post(new Entity( ).chainPut("name", "device1").chainPut("number", "5551112222"));
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         //Clear the current user token
         this.app().token().clearToken();
@@ -137,7 +133,7 @@ public class OwnershipResourceIT extends AbstractRestIT {
         usersResource.entity("me").collection("devices")
                 .post(new Entity( ).chainPut("name", "device2").chainPut("number", "5552223333"));
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         //Check that we can get back device1 on user1
         token = this.app().token().post(new Token(user1.getUsername(),"password"));
@@ -236,13 +232,13 @@ public class OwnershipResourceIT extends AbstractRestIT {
         // create a 4peaks restaurant
         Entity data = this.app().collection("restaurants").post(new Entity().chainPut("name", "4peaks"));
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         //Create a restaurant and link it to user1/me
         Entity fourPeaksData = usersResource.entity("me")
                 .connection("likes").collection( "restaurants" ).entity( "4peaks" ).post();
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         // anonymous user
         this.app().token().clearToken();
@@ -252,11 +248,11 @@ public class OwnershipResourceIT extends AbstractRestIT {
 
         data = this.app().collection("restaurants")
                       .post(new Entity().chainPut("name", "arrogantbutcher"));
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         data = usersResource.entity("me").connection( "likes" ).collection( "restaurants" )
                       .entity( "arrogantbutcher" ).post();
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         String arrogantButcherId = data.getUuid().toString();
 
@@ -390,7 +386,7 @@ public class OwnershipResourceIT extends AbstractRestIT {
         //Sets up the cities collection with the city tempe
         Entity city = this.app().collection("cities").post(new Entity().chainPut("name", "tempe"));
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         // create a 4peaks restaurant that is connected by a like to tempe.
         Entity data = this.app().collection("cities").entity( "tempe" ).connection( "likes" )
@@ -410,7 +406,7 @@ public class OwnershipResourceIT extends AbstractRestIT {
         CollectionEndpoint likeRestaurants =
                 this.app().collection("cities").entity( "tempe" ).connection( "likes" );
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         // check we can get the resturant entities back via uuid without a collection name
         data = likeRestaurants.entity( peaksId ).get();

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/users/PermissionsResourceIT.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/users/PermissionsResourceIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/users/PermissionsResourceIT.java
index aff952b..2dddcf6 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/users/PermissionsResourceIT.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/users/PermissionsResourceIT.java
@@ -66,7 +66,7 @@ public class PermissionsResourceIT extends AbstractRestIT {
 
         user = new User(USER,USER,USER+"@apigee.com","password");
         user = new User( this.app().collection("users").post(user));
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
     }
 
 
@@ -86,13 +86,13 @@ public class PermissionsResourceIT extends AbstractRestIT {
 
         assertEquals( ROLE, node.get("name").toString() );
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         //Post the user with a specific role into the users collection
         node = this.app().collection("roles").entity(node).collection("users").entity(USER).post();
         assertNull( node.get( "error" ) );
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         // now check the user has the role
         node =  this.app().collection("users").entity(USER).collection("roles").entity(ROLE).get();
@@ -104,7 +104,7 @@ public class PermissionsResourceIT extends AbstractRestIT {
         // now delete the role
         this.app().collection("users").entity(USER).collection("roles").entity(ROLE).delete();
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         // check if the role was deleted
 
@@ -136,14 +136,14 @@ public class PermissionsResourceIT extends AbstractRestIT {
 
         assertNull( node.get( "error" ) );
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         //Create a user that is in the group.
         node = this.app().collection("groups").entity(groupPath).collection("users").entity(user).post();
 
         assertNull( node.get( "error" ) );
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         //Get the user and make sure that they are part of the group
         Collection groups = this.app().collection("users").entity(user).collection("groups").get();
@@ -157,7 +157,7 @@ public class PermissionsResourceIT extends AbstractRestIT {
 
         assertNull( response.getError() );
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         //Check that the user no longer exists in the group
         int status = 0;
@@ -193,7 +193,7 @@ public class PermissionsResourceIT extends AbstractRestIT {
 
         assertNull( entity.getError() );
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         // now try to add permission as the user, this should work
         addPermission( "usercreatedrole", "get,put,post:/foo/**" );
@@ -247,13 +247,13 @@ public class PermissionsResourceIT extends AbstractRestIT {
 
         assertNull( node.getError() );
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         // delete the default role to test permissions later
         ApiResponse response = this.app().collection("roles").entity("default").delete();
 
         assertNull( response.getError() );
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         // Grants a permission to GET, POST, and PUT the reviews url for the reviewer role
         addPermission( "reviewer", "get,put,post:/reviews/**" );
@@ -266,22 +266,22 @@ public class PermissionsResourceIT extends AbstractRestIT {
 
         this.app().collection("groups").post(group);
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         // Adds the reviewer to the reviewerGroup
         this.app().collection("groups").entity("reviewergroup").collection("roles").entity("reviewer").post();
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         // Adds reviewer2 user to the reviewergroup
         this.app().collection("users").entity("reviewer2").collection("groups").entity("reviewergroup").post();
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         // Adds reviewer1 to the reviewer role
         this.app().collection("users").entity("reviewer1").collection("roles").entity("reviewer").post();
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         // Set the current context to reviewer1
         this.app().token().post(new Token("reviewer1","password"));
@@ -295,7 +295,7 @@ public class PermissionsResourceIT extends AbstractRestIT {
             .chainPut ("rating", "4").chainPut( "name", "4peaks").chainPut("review", "Huge beer selection" );
         this.app().collection("reviews").post(review);
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         // get the reviews and assert they were created
         QueryParameters queryParameters = new QueryParameters();
@@ -330,7 +330,7 @@ public class PermissionsResourceIT extends AbstractRestIT {
 
         assertEquals( Response.Status.UNAUTHORIZED.getStatusCode(), status );
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         //TODO: maybe make this into two different tests?
 
@@ -346,7 +346,7 @@ public class PermissionsResourceIT extends AbstractRestIT {
             .chainPut( "rating", "4" ).chainPut("name", "currycorner").chainPut( "review", "Authentic" );
         this.app().collection("reviews").post(review);
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         // get all reviews as reviewer2
         queryParameters = new QueryParameters();
@@ -372,7 +372,7 @@ public class PermissionsResourceIT extends AbstractRestIT {
 
         assertEquals( Response.Status.UNAUTHORIZED.getStatusCode(), status );
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         status = 0;
 
@@ -409,7 +409,7 @@ public class PermissionsResourceIT extends AbstractRestIT {
         Entity data = new Entity().chainPut("name", "reviewer");
         this.app().collection("roles").post(data);
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         // allow access to reviews excluding delete
         addPermission( "reviewer",
@@ -433,13 +433,13 @@ public class PermissionsResourceIT extends AbstractRestIT {
                         "wildcardpermusertwo@apigee.com" );
         assertNotNull( userTwoId );
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         //Add user1 to the reviewer role
         this.app().collection("users").entity(userOneId).collection("roles").entity("reviewer").post();
 
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         //Add a book to the books collection
         Entity book = new Entity().chainPut( "title", "Ready Player One" ).chainPut("author", "Earnest Cline");
@@ -449,7 +449,7 @@ public class PermissionsResourceIT extends AbstractRestIT {
         assertEquals( "Ready Player One", book.get("title").toString() );
         String bookId = book.get("uuid").toString();
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         //Switch the contex to be that of user1
         this.app().token().post(new Token("wildcardpermuserone","password"));
@@ -461,7 +461,7 @@ public class PermissionsResourceIT extends AbstractRestIT {
         review = this.app().collection("reviews").post(review);
         String reviewId = review.get("uuid").toString();
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         // POST https://api.usergrid.com/my-org/my-app/users/me/wrote/review/${reviewId}
         this.app().collection("users").entity("me").connection("wrote").collection("review").entity(reviewId).post();
@@ -469,13 +469,13 @@ public class PermissionsResourceIT extends AbstractRestIT {
         // POST https://api.usergrid.com/my-org/my-app/users/me/reviewed/review/${reviewId}
         this.app().collection("users").entity("me").connection("reviewed").collection("books").entity(bookId).post();
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         // POST https://api.usergrid.com/my-org/my-app/books/${bookId}/review/${reviewId}
         this.app().collection("books").entity(bookId).collection("review").entity(reviewId).post();
 
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         // now try to post the same thing to books to verify as userOne does not have correct permissions
         int status = 0;
@@ -522,7 +522,7 @@ public class PermissionsResourceIT extends AbstractRestIT {
 
         //allow patients to add doctors as their followers
         addPermission(  "patient", "delete,post:/users/*/following/users/${user}" );
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         // create examplepatient
         UUID patientId =  createRoleUser( "examplepatient",  "examplepatient@apigee.com" );
@@ -531,12 +531,12 @@ public class PermissionsResourceIT extends AbstractRestIT {
         // create exampledoctor
         UUID doctorId = createRoleUser( "exampledoctor",  "exampledoctor@apigee.com" );
         assertNotNull( doctorId );
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
         // assign examplepatient the patient role
         this.app().collection("users").entity(patientId).collection("roles").entity("patient").post();
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
         this.app().token().post(new Token("examplepatient","password"));
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
         //not working yet, used to be ignored
         //        this.app().collection("users").entity("exampledoctor").connection("following")
         // .collection("users").entity("examplepatient").post();

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/users/RetrieveUsersTest.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/users/RetrieveUsersTest.java b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/users/RetrieveUsersTest.java
index d5f7163..ddb1557 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/users/RetrieveUsersTest.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/users/RetrieveUsersTest.java
@@ -17,19 +17,15 @@
 package org.apache.usergrid.rest.applications.collection.users;
 
 
-import java.util.HashMap;
 import java.util.Map;
 
-import com.fasterxml.jackson.databind.JsonNode;
 import java.io.IOException;
 
 import org.apache.usergrid.rest.test.resource.AbstractRestIT;
 import org.apache.usergrid.rest.test.resource.endpoints.CollectionEndpoint;
-import org.apache.usergrid.rest.test.resource.endpoints.EntityEndpoint;
 import org.apache.usergrid.rest.test.resource.model.Entity;
 import org.apache.usergrid.rest.test.resource.model.QueryParameters;
 import org.junit.Assert;
-import org.junit.Rule;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -55,7 +51,7 @@ public class RetrieveUsersTest extends AbstractRestIT {
 
 
 
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         String query = "select *";
         String incorrectQuery = "select * where username = 'Alica'";
@@ -72,7 +68,7 @@ public class RetrieveUsersTest extends AbstractRestIT {
         props.put( "username", "Nina" );
 
         Entity entity = users.post(props);
-        refreshIndex();
+        waitForQueueDrainAndRefreshIndex();
 
         Map<String,Object> metadata = (Map)entity.get( "metadata" );
         Map<String,Object> sets = (Map)metadata.get( "sets" );