You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by mr...@apache.org on 2017/04/02 23:23:18 UTC
[1/3] usergrid git commit: Switch DISTRIBUTED database queueing to
default not cache in memory as the in memory impl causes duplicate messgae
processing quite often at the moment.
Repository: usergrid
Updated Branches:
refs/heads/master 8b63aae7d -> d3e988bcb
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/users/UserResourceIT.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/users/UserResourceIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/users/UserResourceIT.java
index f8cb9d4e..af87ca5 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/users/UserResourceIT.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/users/UserResourceIT.java
@@ -75,7 +75,7 @@ public class UserResourceIT extends AbstractRestIT {
usersResource = this.app().collection("users");
userResource = this.app().collection("user");
- clientSetup.refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
}
@Test
@@ -249,14 +249,14 @@ public class UserResourceIT extends AbstractRestIT {
// same as above, but with actor partially filled out
Entity entity = usersResource.entity(userId.toString()).activities().post(activity);
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
UUID firstActivityId = entity.getUuid();
activity = new ActivityEntity("rod@rodsimpson.com", "POST", "activity 2");
entity = usersResource.entity(userId.toString()).activities().post(activity);
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
UUID secondActivityId = entity.getUuid();
@@ -284,7 +284,7 @@ public class UserResourceIT extends AbstractRestIT {
map.put("email", email);
Entity userEntity = usersResource.post(new Entity(map));
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
// get the user with username property that has an email value
Entity testUser = usersResource.entity(username).get();
@@ -315,7 +315,7 @@ public class UserResourceIT extends AbstractRestIT {
map.put("email", email);
usersResource.post(new Entity(map));
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
// get the user with email property value
// get the user with username property that has an email value
@@ -339,7 +339,7 @@ public class UserResourceIT extends AbstractRestIT {
map.put("email", email);
usersResource.post(new Entity(map));
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
// get the user with email property value
// get the user with username property that has an email value
@@ -409,7 +409,7 @@ public class UserResourceIT extends AbstractRestIT {
Entity entity = usersResource.post(user);
UUID createdId = entity.getUuid();
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
Collection results = usersResource.get(new QueryParameters().setQuery(String.format("name = '%s'", name)));
entity = new User(results.getResponse().getEntities().get(0));
assertEquals(createdId, entity.getUuid());
@@ -429,13 +429,13 @@ public class UserResourceIT extends AbstractRestIT {
UUID createdId = entity.getUuid();
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
Entity newEntity = usersResource.entity(createdId.toString()).get();
userResource.entity(newEntity).delete();
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
Collection results = usersResource.get(
new QueryParameters().setQuery(String.format("username = '%s'", username)));
@@ -460,7 +460,7 @@ public class UserResourceIT extends AbstractRestIT {
User entity = new User(username, name, email, "password");
entity = new User(usersResource.post(entity));
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
UUID firstCreatedId = entity.getUuid();
username = "username2";
@@ -470,7 +470,7 @@ public class UserResourceIT extends AbstractRestIT {
entity = new User(username, name, email, "password");
entity = new User(usersResource.post(entity));
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
UUID secondCreatedId = entity.getUuid();
@@ -484,7 +484,7 @@ public class UserResourceIT extends AbstractRestIT {
assertEquals(secondCreatedId.toString(), conn1.getUuid().toString());
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
Entity conn2 = usersResource.entity(
@@ -492,7 +492,7 @@ public class UserResourceIT extends AbstractRestIT {
assertEquals(secondCreatedId.toString(), conn2.getUuid().toString());
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
Collection conn1Connections = usersResource.entity(firstCreatedId.toString()).connection("conn1").get();
@@ -542,7 +542,7 @@ public class UserResourceIT extends AbstractRestIT {
// now create a connection of "likes" between the first user and the
// second using pluralized form
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
// named entity in collection name
Entity conn1 = usersResource.entity(firstCreatedId.toString()).connection("conn1", "users")
@@ -595,6 +595,7 @@ public class UserResourceIT extends AbstractRestIT {
role = new Entity();
role.put("name", "connectionQuerybyEmail2");
role = this.app().collection("roles").post(role);
+ waitForQueueDrainAndRefreshIndex();
UUID roleId2 = role.getUuid();
@@ -605,24 +606,24 @@ public class UserResourceIT extends AbstractRestIT {
perms.put("permission", "get:/stuff/**");
Entity perms2 = this.app().collection("roles").entity(roleId2.toString()).connection("permissions")
.post(new Entity(perms));
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
+
//connect the entities where role is the root
Entity perms3 = this.app().collection("roles").entity(roleId1.toString()).connection("users")
.entity(userId.toString()).post();
-
- // now create a connection of "likes" between the first user and the
- // second using pluralized form
-
assertEquals(userId.toString(), perms3.getUuid().toString());
+ waitForQueueDrainAndRefreshIndex();
+ // now create a connection of "likes" between the first user and the
+ // second using pluralized form
//connect the second role
Entity perms4 = this.app().collection("roles").entity(roleId2).connection("users").entity(userId).post();
-
assertEquals(userId.toString(), perms4.getUuid().toString());
+ waitForQueueDrainAndRefreshIndex();
+
- refreshIndex();
//query the second role, it should work
Collection userRoles = this.app().collection("roles").entity(roleId2).connection("users")
.get(new QueryParameters().setQuery("select%20*%20where%20username%20=%20'" + email + "'"));
@@ -678,7 +679,7 @@ public class UserResourceIT extends AbstractRestIT {
Entity pizzaEntity = this.app().collection("pizzas").post(pizza);
UUID secondCreatedId = pizzaEntity.getUuid();
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
// now create a connection of "likes" between the first user and the
// second using pluralized form
@@ -708,7 +709,7 @@ public class UserResourceIT extends AbstractRestIT {
Entity userEntity = usersResource.post(entity);
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
// attempt to log in
Token token = this.app().token().post(new Token(username, "password"));
@@ -728,7 +729,7 @@ public class UserResourceIT extends AbstractRestIT {
userEntity = usersResource.entity(username).put(userEntity);
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
// now see if we've updated
@@ -782,7 +783,7 @@ public class UserResourceIT extends AbstractRestIT {
.chainPut("pin", "1234");
usersResource.post(entity);
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
Collection response = usersResource.get();
// disable the user
@@ -798,7 +799,7 @@ public class UserResourceIT extends AbstractRestIT {
public void test_PUT_password_fail() {
Entity entity = usersResource.post(new User("edanuff", "edanuff", "edanuff@email.com", "sesame"));
this.app().token().post(new Token("edanuff", "sesame"));
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
boolean fail = false;
try {
Entity changeResponse = usersResource.entity("edanuff").collection("password")
@@ -829,17 +830,17 @@ public class UserResourceIT extends AbstractRestIT {
@Test
public void test_PUT_password_ok() {
Entity entity = usersResource.post(new User("edanuff", "edanuff", "edanuff@email.com", "sesame"));
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
usersResource.entity(entity).collection("password").post(new ChangePasswordEntity("sesame", "sesame1"));
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
this.app().token().post(new Token("edanuff", "sesame1"));
// if this was successful, we need to re-set the password for other
// tests
Entity changeResponse = usersResource.entity("edanuff").collection("password")
.post(new ChangePasswordEntity("sesame1", "sesame"));
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
assertNotNull(changeResponse);
}
@@ -849,14 +850,14 @@ public class UserResourceIT extends AbstractRestIT {
public void setUserPasswordAsAdmin() throws IOException {
usersResource.post(new User("edanuff", "edanuff", "edanuff@email.com", "sesame"));
String newPassword = "foofoo";
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
// change the password as admin. The old password isn't required
Entity node = usersResource.entity("edanuff").connection("password")
.post(new ChangePasswordEntity(newPassword));
assertNotNull(node);
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
Token response = this.app().token().post(new Token("edanuff", newPassword));
assertNotNull(response);
}
@@ -899,7 +900,7 @@ public class UserResourceIT extends AbstractRestIT {
public void testChangePassordToInvalidValue() {
Entity entity = usersResource.post(new User("edanuff", "edanuff", "edanuff@email.com", "sesame"));
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
try {
usersResource.entity(entity).collection("password").post(new ChangePasswordEntity("sesame", "abc"));
@@ -930,12 +931,12 @@ public class UserResourceIT extends AbstractRestIT {
this.app().collection("roles").post(role);
// check it
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
// add Role
role = usersResource.entity(createdId).collection("roles").entity(roleName).post();
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
// check it
assertNotNull(role);
assertNotNull(role.get("name"));
@@ -966,7 +967,7 @@ public class UserResourceIT extends AbstractRestIT {
public void revokeToken() throws Exception {
this.app().collection("users").post(new User("edanuff", "edanuff", "edanuff@email.com", "sesame"));
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
Token token1 = this.app().token().post(new Token("edanuff", "sesame"));
Token token2 = this.app().token().post(new Token("edanuff", "sesame"));
@@ -984,7 +985,7 @@ public class UserResourceIT extends AbstractRestIT {
this.app().token().setToken(adminToken);
usersResource.entity("edanuff").connection("revoketokens").post(new Entity().chainPut("token", token1));
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
// the tokens shouldn't work
int status = 0;
@@ -1032,7 +1033,7 @@ public class UserResourceIT extends AbstractRestIT {
// now revoke the tokens
this.app().token().setToken(adminToken);
usersResource.entity("edanuff").connection("revoketokens").post();
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
// the token3 shouldn't work
@@ -1070,7 +1071,7 @@ public class UserResourceIT extends AbstractRestIT {
usersResource.post(new User("test_1", "Test1 User", "test_1@test.com", "test123")); // client.setApiUrl(apiUrl);
usersResource.post(new User("test_2", "Test2 User", "test_2@test.com", "test123")); // client.setApiUrl(apiUrl);
usersResource.post(new User("test_3", "Test3 User", "test_3@test.com", "test123")); // client.setApiUrl(apiUrl);
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
//Entity appInfo = this.app().get().getResponse().getEntities().get(0);
@@ -1080,7 +1081,7 @@ public class UserResourceIT extends AbstractRestIT {
assertNotNull(token.getAccessToken());
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
int status = 0;
@@ -1123,7 +1124,7 @@ public class UserResourceIT extends AbstractRestIT {
Entity entityConn = usersResource.entity(userId).connection("deactivate").post(new Entity());
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
try {
this.app().token().post(new Token("test_1", "test123"));
@@ -1141,7 +1142,7 @@ public class UserResourceIT extends AbstractRestIT {
String randomName = "user1_" + UUIDUtils.newTimeUUID().toString();
User user = new User(randomName, randomName, randomName + "@apigee.com", "password");
usersResource.post(user);
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
// should update a field
Entity response = usersResource.entity(randomName).get();
@@ -1155,7 +1156,7 @@ public class UserResourceIT extends AbstractRestIT {
response = usersResource.post(user2);
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
Entity response2 = usersResource.entity(randomName).get();
@@ -1208,7 +1209,7 @@ public class UserResourceIT extends AbstractRestIT {
assertNotNull(userId);
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
ql = "uuid = " + userId;
@@ -1223,7 +1224,7 @@ public class UserResourceIT extends AbstractRestIT {
public void testCredentialsTransfer() throws Exception {
usersResource.post(new User("test_1", "Test1 User", "test_1@test.com", "test123")); // client.setApiUrl(apiUrl);
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
//Entity appInfo = this.app().get().getResponse().getEntities().get(0);
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/applications/events/EventsResourceIT.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/events/EventsResourceIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/events/EventsResourceIT.java
index 965105b..315f1f5 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/events/EventsResourceIT.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/events/EventsResourceIT.java
@@ -57,7 +57,7 @@ public class EventsResourceIT extends AbstractRestIT {
assertNotNull(node.getEntities());
String advertising = node.getEntity().get("uuid").toString();
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
payload = new LinkedHashMap<String, Object>();
payload.put( "timestamp", 0 );
@@ -74,7 +74,7 @@ public class EventsResourceIT extends AbstractRestIT {
assertNotNull(node.getEntities());
String sales = node.getEntity().get("uuid").toString();
- refreshIndex( );
+ waitForQueueDrainAndRefreshIndex( );
payload = new LinkedHashMap<String, Object>();
payload.put( "timestamp", 0 );
@@ -91,7 +91,7 @@ public class EventsResourceIT extends AbstractRestIT {
assertNotNull(node.getEntities());
String marketing = node.getEntity().get( "uuid" ).toString();
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
String lastId = null;
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/applications/queries/BasicGeoTests.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/queries/BasicGeoTests.java b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/queries/BasicGeoTests.java
index f47fba0..74ad38b 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/queries/BasicGeoTests.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/queries/BasicGeoTests.java
@@ -109,7 +109,7 @@ public class BasicGeoTests extends AbstractRestIT {
assertEquals( lat.toString(), entity.getMap("location").get("latitude").toString() );
assertEquals( lon.toString(), entity.getMap("location").get("longitude").toString() );
- this.refreshIndex();
+ this.waitForQueueDrainAndRefreshIndex();
//2. read back that entity make sure it is accurate
/*
@@ -144,7 +144,7 @@ public class BasicGeoTests extends AbstractRestIT {
assertEquals( newLon.toString(), entity.get( "location" ).get("longitude").asText() );
*/
- this.refreshIndex();
+ this.waitForQueueDrainAndRefreshIndex();
//4. read back the updated entity, make sure it is accurate
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/applications/queries/GeoPagingTest.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/queries/GeoPagingTest.java b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/queries/GeoPagingTest.java
index 91ccf38..9a1cb3c 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/queries/GeoPagingTest.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/queries/GeoPagingTest.java
@@ -91,7 +91,7 @@ public class GeoPagingTest extends AbstractRestIT {
}
}
- this.refreshIndex();
+ this.waitForQueueDrainAndRefreshIndex();
// 2. Query the groups from a nearby location, restricting the search
// by creation time to a single entity where created[i-1] < created[i] < created[i+1]
//since this geo location is contained by an actor it needs to be actor.location.
@@ -150,7 +150,7 @@ public class GeoPagingTest extends AbstractRestIT {
.map("latitude", -33.889058)
.map("longitude", 151.124024));
this.app().collection(collectionType).post(props2);
- this.refreshIndex();
+ this.waitForQueueDrainAndRefreshIndex();
Collection collection = this.app().collection(collectionType).get();
assertEquals("Should return both entities", 2, collection.getResponse().getEntityCount());
@@ -182,7 +182,7 @@ public class GeoPagingTest extends AbstractRestIT {
cats[i] = cat;
this.app().collection("cats").post(cat);
}
- this.refreshIndex();
+ this.waitForQueueDrainAndRefreshIndex();
QueryParameters params = new QueryParameters();
for (int consistent = 0; consistent < 20; consistent++) {
@@ -229,7 +229,7 @@ public class GeoPagingTest extends AbstractRestIT {
cats[i] = cat;
this.app().collection("cats").post(cat);
}
- this.refreshIndex();
+ this.waitForQueueDrainAndRefreshIndex();
final QueryParameters params = new QueryParameters();
for (int consistent = 0; consistent < 20; consistent++) {
@@ -279,6 +279,7 @@ public class GeoPagingTest extends AbstractRestIT {
.map("latitude", -33.746369)
.map("longitude", 150.952183));
this.app().collection(collectionType).post(props);
+ this.waitForQueueDrainAndRefreshIndex();
Entity props2 = new Entity();
props2.put("name", "usergrid2");
@@ -286,7 +287,7 @@ public class GeoPagingTest extends AbstractRestIT {
.map("latitude", -33.889058)
.map("longitude", 151.124024));
this.app().collection(collectionType).post(props2);
- this.refreshIndex();
+ this.waitForQueueDrainAndRefreshIndex();
// 2. Query from the center point to ensure that one is returned
Collection collection = this.app().collection(collectionType).get(queryClose);
@@ -326,7 +327,7 @@ public class GeoPagingTest extends AbstractRestIT {
.map("longitude", 150.952183));
this.app().collection("users").post(props);
- this.refreshIndex();
+ this.waitForQueueDrainAndRefreshIndex();
// 2. Create a list of geo points
List<double[]> points = new ArrayList<>();
points.add(new double []{33.746369, -89});//Woodland, MS
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/applications/queries/MatrixQueryTests.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/queries/MatrixQueryTests.java b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/queries/MatrixQueryTests.java
index 7525d51..02a54da 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/queries/MatrixQueryTests.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/queries/MatrixQueryTests.java
@@ -21,7 +21,6 @@ import org.apache.usergrid.rest.test.resource.endpoints.CollectionEndpoint;
import org.apache.usergrid.rest.test.resource.model.Collection;
import org.apache.usergrid.rest.test.resource.model.Entity;
import org.apache.usergrid.rest.test.resource.model.QueryParameters;
-import org.junit.Ignore;
import org.junit.Test;
import java.util.UUID;
@@ -78,7 +77,7 @@ public class MatrixQueryTests extends AbstractRestIT {
restaurant2 = this.app().collection("restaurants").post(restaurant2);
restaurant3 = this.app().collection("restaurants").post(restaurant3);
restaurant4 = this.app().collection("restaurants").post(restaurant4);
- this.refreshIndex();
+ this.waitForQueueDrainAndRefreshIndex();
//3. Create "likes" connections between users and restaurants
//user 1 likes old major
@@ -91,7 +90,7 @@ public class MatrixQueryTests extends AbstractRestIT {
//user 3 likes Lola (it shouldn't appear in the results)
this.app().collection("users").entity(user3).connection("likes").collection("restaurants").entity(restaurant4).post();
- this.refreshIndex();
+ this.waitForQueueDrainAndRefreshIndex();
//4. Retrieve "likes" connections per user and ensure the correct restaurants are returned
Collection user1likes = this.app().collection("users").entity(user1).connection("likes").get();
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/applications/queries/OrderByTest.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/queries/OrderByTest.java b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/queries/OrderByTest.java
index a190526..6591713 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/queries/OrderByTest.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/queries/OrderByTest.java
@@ -246,7 +246,7 @@ public class OrderByTest extends QueryTestBase {
}
}
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
//2. Query without 'order by'
String query = "select * where created > " + created;
QueryParameters params = new QueryParameters().setQuery(query);
@@ -289,7 +289,7 @@ public class OrderByTest extends QueryTestBase {
this.app().collection("activity").post(props);
}
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
//2. Query a subset of the entities, specifying order and limit
String query = "select * where created > " + 1 + " order by created desc";
QueryParameters params = new QueryParameters().setQuery(query).setLimit(5);
@@ -334,7 +334,7 @@ public class OrderByTest extends QueryTestBase {
logger.info(String.valueOf(Long.parseLong(activities[0].get("created").toString())));
}
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex(750);
ArrayUtils.reverse(activities);
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/applications/queries/QueryTestBase.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/queries/QueryTestBase.java b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/queries/QueryTestBase.java
index f7eb3fe..0adafef 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/queries/QueryTestBase.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/queries/QueryTestBase.java
@@ -65,7 +65,7 @@ public class QueryTestBase extends AbstractRestIT {
logger.info(entities[i].entrySet().toString());
}
//refresh the index so that they are immediately searchable
- this.refreshIndex();
+ this.waitForQueueDrainAndRefreshIndex();
return entities;
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/applications/queries/SelectMappingsQueryTest.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/queries/SelectMappingsQueryTest.java b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/queries/SelectMappingsQueryTest.java
index 286c984..acf51c1 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/queries/SelectMappingsQueryTest.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/queries/SelectMappingsQueryTest.java
@@ -83,7 +83,7 @@ public class SelectMappingsQueryTest extends QueryTestBase {
.withProp( "testProp", value )
.withProp( "TESTPROP", otherValue);
app().collection( collectionName ).post( entity );
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
// testProp and TESTPROP should now have otherValue
@@ -110,7 +110,7 @@ public class SelectMappingsQueryTest extends QueryTestBase {
Entity entity = new Entity()
.withProp( "testprop", value );
app().collection( collectionName ).post( entity );
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
// testProp and TESTPROP should now have otherValue
@@ -130,7 +130,7 @@ public class SelectMappingsQueryTest extends QueryTestBase {
Entity entity = new Entity()
.withProp( "testprop", value );
app().collection( collectionName ).post( entity );
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
// now query this without encoding the plus symbol
QueryParameters params = new QueryParameters()
@@ -160,13 +160,13 @@ public class SelectMappingsQueryTest extends QueryTestBase {
String value = RandomStringUtils.randomAlphabetic( 20 );
Entity entity = new Entity().withProp( "testProp", value );
app().collection( collectionName ).post( entity );
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
// override with TESTPROP=newValue
String newValue = RandomStringUtils.randomAlphabetic( 20 );
entity = new Entity().withProp( "TESTPROP", newValue );
app().collection( collectionName ).post( entity );
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
// testProp and TESTPROP should new be queryable by new value
@@ -193,13 +193,13 @@ public class SelectMappingsQueryTest extends QueryTestBase {
String value = RandomStringUtils.randomAlphabetic( 20 );
Entity entity = new Entity().withProp( "TESTPROP", value );
app().collection( collectionName ).post( entity );
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
// override with testProp=newValue
String newValue = RandomStringUtils.randomAlphabetic( 20 );
entity = new Entity().withProp( "testProp", newValue );
app().collection( collectionName ).post( entity );
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
// testProp and TESTPROP should new be queryable by new value
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/management/AccessTokenIT.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/management/AccessTokenIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/management/AccessTokenIT.java
index 431d224..841ac1d 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/management/AccessTokenIT.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/management/AccessTokenIT.java
@@ -153,7 +153,7 @@ public class AccessTokenIT extends AbstractRestIT {
assertNotNull( token.getAccessToken() );
management().token().setToken( token );
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
assertNotNull( management().me().get( Token.class ) );
@@ -177,7 +177,7 @@ public class AccessTokenIT extends AbstractRestIT {
assertNotNull( adminToken );
assertNotNull( adminToken.getAccessToken() );
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
assertNotNull( management().me().get( Token.class ) );
@@ -237,7 +237,7 @@ public class AccessTokenIT extends AbstractRestIT {
management().token().setToken( clientSetup.getSuperuserToken() );
management().users().user( clientSetup.getUsername() ).revokeTokens().post(true , ApiResponse.class, null,null);
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
//test that token 1 doesn't work
@@ -278,7 +278,7 @@ public class AccessTokenIT extends AbstractRestIT {
management().token().setToken( clientSetup.getSuperuserToken() );
management().users().user( clientSetup.getUsername() ).revokeToken().post( false, ApiResponse.class,null,queryParameters );
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
//test that token 1 doesn't work
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/management/AdminUsersIT.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/management/AdminUsersIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/management/AdminUsersIT.java
index f80f131..829f561 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/management/AdminUsersIT.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/management/AdminUsersIT.java
@@ -17,7 +17,6 @@
package org.apache.usergrid.rest.management;
-import com.sun.jersey.api.client.UniformInterfaceException;
import net.jcip.annotations.NotThreadSafe;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.usergrid.management.MockImapClient;
@@ -77,7 +76,7 @@ public class AdminUsersIT extends AbstractRestIT {
// change the password as admin. The old password isn't required
management.users().user( username ).password().post(Entity.class,passwordPayload);
- this.refreshIndex();
+ this.waitForQueueDrainAndRefreshIndex();
//Get the token using the new password
Token adminToken = management.token().post( false, Token.class, new Token( username, "testPassword" ) ,null );
@@ -159,7 +158,7 @@ public class AdminUsersIT extends AbstractRestIT {
// change the password as admin. The old password isn't required
management.users().user( username ).password().post(Entity.class, passwordPayload );
- this.refreshIndex();
+ this.waitForQueueDrainAndRefreshIndex();
//Get the token using the new password
@@ -197,7 +196,7 @@ public class AdminUsersIT extends AbstractRestIT {
management.token().setToken( clientSetup.getSuperuserToken());
management.users().user( username ).password().post( passwordPayload );
- this.refreshIndex();
+ this.waitForQueueDrainAndRefreshIndex();
assertNotNull( management.token().post( false, Token.class, new Token(username, "testPassword"), null ));
@@ -260,7 +259,7 @@ public class AdminUsersIT extends AbstractRestIT {
//Send rest call to the /testProperties endpoint to persist property changes
clientSetup.getRestClient().testPropertiesResource().post( testPropertiesPayload );
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
//Create organization for the admin user to be confirmed
Organization organization = createOrgPayload( "testUnconfirmedAdminLogin", null );
@@ -342,7 +341,7 @@ public class AdminUsersIT extends AbstractRestIT {
//Send rest call to the /testProperties endpoint to persist property changes
clientSetup.getRestClient().testPropertiesResource().post( testPropertiesPayload );
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
Token superuserToken = management.token().post( Token.class,
new Token( clientSetup.getSuperuserName(), clientSetup.getSuperuserPassword() ) );
@@ -379,7 +378,7 @@ public class AdminUsersIT extends AbstractRestIT {
//Send rest call to the /testProperties endpoint to persist property changes
clientSetup.getRestClient().testPropertiesResource().post( testPropertiesPayload );
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
Token testToken = management().token().post(Token.class,
new Token( originalTestProperties.getAsString( PROPERTIES_TEST_ACCOUNT_ADMIN_USER_EMAIL ),
@@ -586,7 +585,7 @@ public class AdminUsersIT extends AbstractRestIT {
public void reactivateTest() throws Exception {
//call reactivate endpoint on default user
clientSetup.getRestClient().management().users().user( clientSetup.getUsername() ).reactivate().get();
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
//Create mocked inbox and check to see if you recieved an email in the users inbox.
List<Message> inbox = Mailbox.get( clientSetup.getEmail());
@@ -599,7 +598,7 @@ public class AdminUsersIT extends AbstractRestIT {
// initiate password reset
management().users().user( clientSetup.getUsername() ).resetpw().post(new Form());
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
// create mocked inbox, get password reset email and extract token
List<Message> inbox = Mailbox.get( clientSetup.getEmail() );
@@ -630,7 +629,7 @@ public class AdminUsersIT extends AbstractRestIT {
assertTrue( html.contains( "password set" ) );
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
html = management().users().user( clientSetup.getUsername() ).resetpw().post( formData );
@@ -644,7 +643,7 @@ public class AdminUsersIT extends AbstractRestIT {
// initiate password reset
management().users().user( clientSetup.getUsername() ).resetpw().post(new Form());
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
// create mocked inbox, get password reset email and extract token
List<Message> inbox = Mailbox.get( clientSetup.getEmail() );
@@ -725,7 +724,7 @@ public class AdminUsersIT extends AbstractRestIT {
payload.put( "newpassword", passwords[1] );
management().users().user( clientSetup.getUsername() ).password().post( Entity.class,payload );
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
payload.put( "newpassword", passwords[0] );
payload.put( "oldpassword", passwords[1] );
@@ -747,7 +746,7 @@ public class AdminUsersIT extends AbstractRestIT {
// request password reset
management().users().user( clientSetup.getUsername() ).resetpw().post(new Form());
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
// get resetpw token from email
@@ -774,7 +773,7 @@ public class AdminUsersIT extends AbstractRestIT {
String html = management().users().user( clientSetup.getUsername() ).resetpw().getTarget().request()
.post( javax.ws.rs.client.Entity.form(formData), String.class );
assertTrue( html.contains( "password set" ) );
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
// login with new password and get token
@@ -797,7 +796,7 @@ public class AdminUsersIT extends AbstractRestIT {
put("newpassword", "test");
}};
management().users().user( clientSetup.getUsername() ).password().post( false, payload, null );
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
// get password and check password change time again
@@ -851,7 +850,7 @@ public class AdminUsersIT extends AbstractRestIT {
//Create admin user
management().orgs().org( clientSetup.getOrganizationName() ).users().post(ApiResponse.class ,adminUserPayload );
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
//Retrieves the admin users
ApiResponse adminUsers = management().orgs().org( clientSetup.getOrganizationName() ).users().get(ApiResponse.class);
@@ -905,7 +904,7 @@ public class AdminUsersIT extends AbstractRestIT {
//Send rest call to the /testProperties endpoint to persist property changes
clientSetup.getRestClient().testPropertiesResource().post( testPropertiesPayload );
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
//Retrieve properties and ensure that they are set correctly.
ApiResponse apiResponse = clientSetup.getRestClient().testPropertiesResource().get();
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/management/ExportResourceIT.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/management/ExportResourceIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/management/ExportResourceIT.java
index 81ff2d3..1b649d2 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/management/ExportResourceIT.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/management/ExportResourceIT.java
@@ -141,7 +141,7 @@ public class ExportResourceIT extends AbstractRestIT {
assertNotNull( uuid );
exportEntity = null;
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
try {
exportEntity = management().orgs().org( clientSetup.getOrganizationName() )
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/management/ImportResourceIT.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/management/ImportResourceIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/management/ImportResourceIT.java
index c390393..9c35a6c 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/management/ImportResourceIT.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/management/ImportResourceIT.java
@@ -193,7 +193,7 @@ public class ImportResourceIT extends AbstractRestIT {
Organization orgPayload = new Organization(
newOrgName, newOrgUsername, newOrgEmail, newOrgName, newOrgPassword, null);
Organization orgCreatedResponse = clientSetup.getRestClient().management().orgs().post(orgPayload);
- this.refreshIndex();
+ this.waitForQueueDrainAndRefreshIndex();
assertNotNull(orgCreatedResponse);
@@ -391,8 +391,8 @@ public class ImportResourceIT extends AbstractRestIT {
// for ( org.apache.usergrid.persistence.Entity importedThing : importedThings ) {
// emApp1.delete( importedThing );
// }
-// emApp1.refreshIndex();
-// emApp2.refreshIndex();
+// emApp1.waitForQueueDrainAndRefreshIndex();
+// emApp2.waitForQueueDrainAndRefreshIndex();
//
// importedThings = emApp2.getCollection(
// appId2, "things", null, Query.Level.ALL_PROPERTIES).getEntities();
@@ -438,7 +438,7 @@ public class ImportResourceIT extends AbstractRestIT {
.addToPath(importEntity.getUuid().toString())
.get();
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
Entity importGetIncludes = this.management().orgs().org( org ).app()
.addToPath(app)
@@ -671,7 +671,7 @@ public class ImportResourceIT extends AbstractRestIT {
Thread.sleep(1000);
}
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
return importEntity;
}
@@ -697,7 +697,7 @@ public class ImportResourceIT extends AbstractRestIT {
}
- this.refreshIndex();
+ this.waitForQueueDrainAndRefreshIndex();
// // first two things are related to each other
// em.createConnection(new SimpleEntityRef(type, created.get(0).getUuid()),
@@ -705,7 +705,7 @@ public class ImportResourceIT extends AbstractRestIT {
// em.createConnection(new SimpleEntityRef(type, created.get(1).getUuid()),
// "related", new SimpleEntityRef(type, created.get(0).getUuid()));
//
-// em.refreshIndex();
+// em.waitForQueueDrainAndRefreshIndex();
}
/**
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/management/ManagementResourceIT.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/management/ManagementResourceIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/management/ManagementResourceIT.java
index 635368e..0a80d73 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/management/ManagementResourceIT.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/management/ManagementResourceIT.java
@@ -213,7 +213,7 @@ public class ManagementResourceIT extends AbstractRestIT {
users1.add( "follower" + Integer.toString( i ) );
}
- refreshIndex( );
+ waitForQueueDrainAndRefreshIndex( );
checkFeed( "leader1", users1 );
//try with 11
@@ -230,20 +230,20 @@ public class ManagementResourceIT extends AbstractRestIT {
//create user
createUser( leader );
- refreshIndex( );
+ waitForQueueDrainAndRefreshIndex( );
String preFollowContent = leader + ": pre-something to look for " + UUID.randomUUID().toString();
addActivity( leader, leader + " " + leader + "son", preFollowContent );
- refreshIndex( );
+ waitForQueueDrainAndRefreshIndex( );
String lastUser = followers.get( followers.size() - 1 );
int i = 0;
for ( String user : followers ) {
createUser( user );
- refreshIndex( );
+ waitForQueueDrainAndRefreshIndex( );
follow( user, leader );
- refreshIndex( );
+ waitForQueueDrainAndRefreshIndex( );
}
userFeed = getUserFeed( lastUser );
assertTrue( userFeed.size() == 1 );
@@ -254,7 +254,7 @@ public class ManagementResourceIT extends AbstractRestIT {
String postFollowContent = leader + ": something to look for " + UUID.randomUUID().toString();
addActivity( leader, leader + " " + leader + "son", postFollowContent );
- refreshIndex( );
+ waitForQueueDrainAndRefreshIndex( );
//check feed
userFeed = getUserFeed( lastUser );
@@ -321,7 +321,7 @@ public class ManagementResourceIT extends AbstractRestIT {
.post( new Application( "mgmt-org-app" ) );
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
Entity appdata = apiResponse.getEntities().get(0);
assertEquals((clientSetup.getOrganizationName() + "/mgmt-org-app")
@@ -336,7 +336,7 @@ public class ManagementResourceIT extends AbstractRestIT {
assertEquals("Roles", roles.get("title").toString());
assertEquals(4, roles.size());
- refreshIndex( );
+ waitForQueueDrainAndRefreshIndex( );
// GET /applications/mgmt-org-app
@@ -361,7 +361,7 @@ public class ManagementResourceIT extends AbstractRestIT {
public void checkSizes() throws Exception {
final String appname = clientSetup.getAppName();
this.app().collection("testCollection").post(new Entity().chainPut("name","test"));
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
Entity size = management().orgs().org( clientSetup.getOrganizationName() ).app().addToPath(appname).addToPath("_size").get();
Entity rolesSize = management().orgs().org(clientSetup.getOrganizationName()).app().addToPath(appname).addToPath("roles/_size").get();
Entity collectionsSize = management().orgs().org(clientSetup.getOrganizationName()).app().addToPath(appname).addToPath("collections/_size").get();
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/management/OrganizationsIT.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/management/OrganizationsIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/management/OrganizationsIT.java
index 2bbdaaf..ad204ae 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/management/OrganizationsIT.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/management/OrganizationsIT.java
@@ -68,7 +68,7 @@ public class OrganizationsIT extends AbstractRestIT {
assertNotNull( organizationResponse );
// Thread.sleep( 1000 );
-// this.refreshIndex();
+// this.waitForQueueDrainAndRefreshIndex();
//Creates token
Token token =
@@ -78,7 +78,7 @@ public class OrganizationsIT extends AbstractRestIT {
assertNotNull( token );
- //this.refreshIndex();
+ //this.waitForQueueDrainAndRefreshIndex();
//Assert that the get returns the correct org and owner.
Organization returnedOrg = clientSetup.getRestClient().management().orgs().org( organization.getOrganization() ).get();
@@ -136,7 +136,7 @@ public class OrganizationsIT extends AbstractRestIT {
// Create organization
Organization organization = createOrgPayload( "testCreateDuplicateOrgName", null );
Organization orgCreatedResponse = clientSetup.getRestClient().management().orgs().post( organization );
- this.refreshIndex();
+ this.waitForQueueDrainAndRefreshIndex();
assertNotNull( orgCreatedResponse );
@@ -193,7 +193,7 @@ public class OrganizationsIT extends AbstractRestIT {
//create the org/owner
Organization orgCreatedResponse = clientSetup.getRestClient().management().orgs().post( organization );
- this.refreshIndex();
+ this.waitForQueueDrainAndRefreshIndex();
assertNotNull( orgCreatedResponse );
@@ -394,7 +394,7 @@ public class OrganizationsIT extends AbstractRestIT {
//update the organization.
management().orgs().org( clientSetup.getOrganizationName() ).put(orgPayload);
- this.refreshIndex();
+ this.waitForQueueDrainAndRefreshIndex();
//retrieve the organization
Organization orgResponse = management().orgs().org( clientSetup.getOrganizationName() ).get();
@@ -408,7 +408,7 @@ public class OrganizationsIT extends AbstractRestIT {
//update the organization.
management().orgs().org( clientSetup.getOrganizationName() ).put(orgPayload);
- this.refreshIndex();
+ this.waitForQueueDrainAndRefreshIndex();
orgResponse = management().orgs().org( clientSetup.getOrganizationName() ).get();
@@ -433,7 +433,7 @@ public class OrganizationsIT extends AbstractRestIT {
Organization orgResponse = management().orgs().org( clientSetup.getOrganizationName() ).get();
- this.refreshIndex();
+ this.waitForQueueDrainAndRefreshIndex();
//attempt to post duplicate connection
Entity userPostResponse = management().orgs().org( clientSetup.getOrganizationName() ).users().user( clientSetup.getEmail() ).put( entity );
@@ -461,7 +461,7 @@ public class OrganizationsIT extends AbstractRestIT {
Organization orgResponse = management().orgs().org( clientSetup.getOrganizationName() ).get();
- this.refreshIndex();
+ this.waitForQueueDrainAndRefreshIndex();
//attempt to post duplicate connection
try {
Entity userPostResponse = management().orgs().org( clientSetup.getOrganizationName() ).users().post( Entity.class, entity );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/management/RegistrationIT.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/management/RegistrationIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/management/RegistrationIT.java
index 8404632..90a6919 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/management/RegistrationIT.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/management/RegistrationIT.java
@@ -170,7 +170,7 @@ public class RegistrationIT extends AbstractRestIT {
"changeme");
UUID userId = node.getUuid();
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
String subject = "Password Reset";
@@ -239,7 +239,7 @@ public class RegistrationIT extends AbstractRestIT {
//Disgusting data manipulation to parse the form response.
Map adminUserPostResponse = (management().users().post( User.class, userForm ));
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
Map adminDataMap = ( Map ) adminUserPostResponse.get( "data" );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/test/resource/AbstractRestIT.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/test/resource/AbstractRestIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/test/resource/AbstractRestIT.java
index 6e5e4f9..4799b0c 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/test/resource/AbstractRestIT.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/test/resource/AbstractRestIT.java
@@ -173,10 +173,12 @@ public class AbstractRestIT extends JerseyTest {
return this.app().token().post( new Token( username, password ) );
}
- public void refreshIndex() {
- //TODO see how we can refresh index (not async) for tests so sleep may not be needed
+ public void waitForQueueDrainAndRefreshIndex(int waitTimeMillis) {
+ // indexing is async, tests will need to wait for stuff to be processed.
+ // this sleep is slightly longer becasue distributed queueing on top of Cassandra can be used without and in-mem
+ // copy. see Qakka in the persistence module
try {
- Thread.sleep(250); //ensure index docs are finished being sent to Elasticsearch by Usergrid before refresh
+ Thread.sleep(waitTimeMillis);
clientSetup.refreshIndex();
} catch (InterruptedException e) {
System.out.println("Error refreshing index");
@@ -184,6 +186,10 @@ public class AbstractRestIT extends JerseyTest {
}
}
+ public void waitForQueueDrainAndRefreshIndex() {
+ waitForQueueDrainAndRefreshIndex(750);
+ }
+
/**
* Takes in the expectedStatus message and the expectedErrorMessage then compares it to the ClientErrorException
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/resources/usergrid-custom-test.properties
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/resources/usergrid-custom-test.properties b/stack/rest/src/test/resources/usergrid-custom-test.properties
index 98c5640..615bedd 100644
--- a/stack/rest/src/test/resources/usergrid-custom-test.properties
+++ b/stack/rest/src/test/resources/usergrid-custom-test.properties
@@ -72,6 +72,11 @@ elasticsearch.queue_impl=DISTRIBUTED
# Queueing Test Settings
# Reduce the long polling time for the tests
queue.long.polling.time.millis=50
-queue.num.actors=50
+queue.num.actors=100
queue.sender.num.actors=100
queue.writer.num.actors=100
+elasticsearch.worker_count=12
+elasticsearch.worker_count_utility=4
+queue.get.timeout.seconds=10
+queue.send.timeout.seconds=10
+usergrid.push.worker_count=8
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
index 7d02360..22d7344 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
@@ -20,6 +20,7 @@ import com.codahale.metrics.*;
import com.codahale.metrics.Timer;
import com.google.inject.Injector;
+import org.apache.commons.lang.RandomStringUtils;
import org.apache.usergrid.persistence.EntityManagerFactory;
import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
@@ -148,7 +149,7 @@ public class QueueListener {
Thread.currentThread().setDaemon(true);
}
- Thread.currentThread().setName(getClass().getSimpleName()+"_PushNotifications-"+threadNumber);
+ Thread.currentThread().setName(getClass().getSimpleName()+"_Push-"+ RandomStringUtils.randomAlphanumeric(4)+"-"+threadNumber);
final AtomicInteger consecutiveExceptions = new AtomicInteger();
@@ -268,12 +269,16 @@ public class QueueListener {
if (logger.isTraceEnabled()) {
logger.trace("no messages...sleep...{}", sleepWhenNoneFound);
}
- Thread.sleep(sleepWhenNoneFound);
+ try {
+ Thread.sleep(sleepWhenNoneFound);
+ } catch (InterruptedException e){
+ // noop
+ }
}
timerContext.stop();
//send to the providers
consecutiveExceptions.set(0);
- }catch (Exception ex){
+ } catch (Exception ex){
logger.error("failed to dequeue",ex);
// clear the queue name cache b/c tests might have wiped the keyspace
@@ -286,7 +291,7 @@ public class QueueListener {
Thread.sleep(sleeptime);
}catch (InterruptedException ie){
if (logger.isTraceEnabled()) {
- logger.info("sleep interrupted");
+ logger.trace("sleep interrupted");
}
}
}
@@ -306,7 +311,7 @@ public class QueueListener {
return;
}
for(Future future : futures){
- future.cancel(true);
+ future.cancel(false);
}
pool.shutdownNow();
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/services/src/main/java/org/apache/usergrid/services/notifications/gcm/GCMAdapter.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/gcm/GCMAdapter.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/gcm/GCMAdapter.java
index 6b619b7..44b0139 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/gcm/GCMAdapter.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/gcm/GCMAdapter.java
@@ -16,7 +16,6 @@
*/
package org.apache.usergrid.services.notifications.gcm;
-import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.android.gcm.server.*;
import org.apache.usergrid.persistence.entities.Notification;
@@ -99,7 +98,7 @@ public class GCMAdapter implements ProviderAdapter {
if(!map.containsKey(priorityKey) && notification.getPriority() != null){
map.put(priorityKey, notification.getPriority());
}
- Batch batch = getBatch( map);
+ Batch batch = getBatch( map );
batch.add(providerId, tracker);
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/services/src/test/java/org/apache/usergrid/ServiceApplication.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/ServiceApplication.java b/stack/services/src/test/java/org/apache/usergrid/ServiceApplication.java
index 89ff272..74c5c92 100644
--- a/stack/services/src/test/java/org/apache/usergrid/ServiceApplication.java
+++ b/stack/services/src/test/java/org/apache/usergrid/ServiceApplication.java
@@ -73,7 +73,7 @@ public class ServiceApplication extends CoreApplication {
ServiceResults testRequest = testRequest( action, expectedCount, true, params );
if ( !action.equals( ServiceAction.GET )) {
- this.refreshIndex();
+ this.waitForQueueDrainAndRefreshIndex();
}
return testRequest;
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/services/src/test/java/org/apache/usergrid/management/EmailFlowIT.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/management/EmailFlowIT.java b/stack/services/src/test/java/org/apache/usergrid/management/EmailFlowIT.java
index a46bd60..60a5ba0 100644
--- a/stack/services/src/test/java/org/apache/usergrid/management/EmailFlowIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/management/EmailFlowIT.java
@@ -235,7 +235,7 @@ public class EmailFlowIT {
assertNotNull( orgOwner );
ApplicationInfo app = setup.getMgmtSvc().createApplication( orgOwner.getOrganization().getUuid(), appName );
- this.app.refreshIndex();
+ this.app.waitForQueueDrainAndRefreshIndex();
//turn on app admin approval for app users
enableAdminApproval(app.getId());
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/services/src/test/java/org/apache/usergrid/management/RoleIT.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/management/RoleIT.java b/stack/services/src/test/java/org/apache/usergrid/management/RoleIT.java
index 075ee03..20d12ab 100644
--- a/stack/services/src/test/java/org/apache/usergrid/management/RoleIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/management/RoleIT.java
@@ -62,7 +62,7 @@ public class RoleIT {
UUID applicationId = setup.getMgmtSvc().createApplication( organization.getUuid(), "test-app" ).getId();
EntityManager em = setup.getEmf().getEntityManager( applicationId );
- setup.getEntityIndex().refresh(em.getApplicationId());
+ setup.getEntityIndex().waitForQueueDrainAndRefresh(em.getApplicationId(), 500);
Map<String, Object> properties = new LinkedHashMap<String, Object>();
properties.put( "username", "edanuff5" );
@@ -71,8 +71,7 @@ public class RoleIT {
User user = em.create( User.ENTITY_TYPE, User.class, properties );
em.createRole( "logged-in", "Logged In", 2000 );
- setup.getEntityIndex().refresh(em.getApplicationId());
- setup.getEntityIndex().refresh(em.getApplicationId());
+ setup.getEntityIndex().waitForQueueDrainAndRefresh(em.getApplicationId(), 500);
em.addUserToRole( user.getUuid(), "logged-in" );
String accessToken = setup.getMgmtSvc().getAccessTokenForAppUser( applicationId, user.getUuid(), 0 );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/services/src/test/java/org/apache/usergrid/services/CollectionServiceIT.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/services/CollectionServiceIT.java b/stack/services/src/test/java/org/apache/usergrid/services/CollectionServiceIT.java
index c071d1f..62818c2 100644
--- a/stack/services/src/test/java/org/apache/usergrid/services/CollectionServiceIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/services/CollectionServiceIT.java
@@ -232,7 +232,7 @@ public class CollectionServiceIT extends AbstractServiceIT {
// ok
}
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
try {
// try DELETE on cats with dogs name
app.testRequest( ServiceAction.DELETE, 0, "cats", "Danny" );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/services/src/test/java/org/apache/usergrid/services/GroupServiceIT.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/services/GroupServiceIT.java b/stack/services/src/test/java/org/apache/usergrid/services/GroupServiceIT.java
index d3c2436..d37bb10 100644
--- a/stack/services/src/test/java/org/apache/usergrid/services/GroupServiceIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/services/GroupServiceIT.java
@@ -68,13 +68,14 @@ public class GroupServiceIT extends AbstractServiceIT {
app.createGroupRole( group.getUuid(), "admin", 0 );
app.createGroupRole( group.getUuid(), "author", 0 );
- setup.getEntityIndex().refresh(app.getId());
+ app.waitForQueueDrainAndRefreshIndex(500);
app.grantGroupRolePermission( group.getUuid(), "admin", "users:access:*" );
app.grantGroupRolePermission( group.getUuid(), "admin", "groups:access:*" );
app.grantGroupRolePermission( group.getUuid(), "author", "assets:access:*" );
- setup.getEntityIndex().refresh(app.getId());
+
+ app.waitForQueueDrainAndRefreshIndex(500);
app.testDataRequest( ServiceAction.GET, "groups", group.getUuid(), "rolenames" );
app.testDataRequest( ServiceAction.GET, "groups", group.getUuid(), "roles", "admin", "permissions" );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/services/src/test/java/org/apache/usergrid/services/ServiceInvocationIT.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/services/ServiceInvocationIT.java b/stack/services/src/test/java/org/apache/usergrid/services/ServiceInvocationIT.java
index 8c2be2c..81dced1 100644
--- a/stack/services/src/test/java/org/apache/usergrid/services/ServiceInvocationIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/services/ServiceInvocationIT.java
@@ -82,6 +82,8 @@ public class ServiceInvocationIT extends AbstractServiceIT {
app.testRequest( ServiceAction.POST, 1, null, "users", "edanuff", "likes", cat.getUuid() );
+ app.waitForQueueDrainAndRefreshIndex(250);
+
Entity restaurant = app.doCreate( "restaurant", "Brickhouse" );
app.createConnection( user, "likes", restaurant );
@@ -92,6 +94,8 @@ public class ServiceInvocationIT extends AbstractServiceIT {
app.testRequest( ServiceAction.POST, 1, "users", user.getUuid(), "connections", "likes", restaurant.getUuid() );
+ app.waitForQueueDrainAndRefreshIndex(250);
+
app.testRequest( ServiceAction.GET, 1, "users", "edanuff", "likes", "cats" );
app.testRequest( ServiceAction.GET, 3, "users", "edanuff", "likes" );
@@ -104,7 +108,7 @@ public class ServiceInvocationIT extends AbstractServiceIT {
app.testRequest( ServiceAction.GET, 1, "users", "edanuff", "likes",
Query.fromQL( "select * where name='axis*'" ) );
-// TODO, we don't allow this at the RESt level, why is this a test?
+// TODO, we don't allow this at the REST level, why is this a test?
// app.testRequest( ServiceAction.GET, 3, null, "users", "edanuff", "connections" );
app.put( "color", "blacknwhite" );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/services/src/test/java/org/apache/usergrid/services/notifications/AbstractServiceNotificationIT.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/services/notifications/AbstractServiceNotificationIT.java b/stack/services/src/test/java/org/apache/usergrid/services/notifications/AbstractServiceNotificationIT.java
index 5ea815f..c035192 100644
--- a/stack/services/src/test/java/org/apache/usergrid/services/notifications/AbstractServiceNotificationIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/services/notifications/AbstractServiceNotificationIT.java
@@ -28,7 +28,6 @@ import org.apache.usergrid.persistence.SimpleEntityRef;
import org.apache.usergrid.persistence.entities.Notification;
import org.apache.usergrid.persistence.entities.Receipt;
import org.apache.usergrid.services.AbstractServiceIT;
-import org.apache.usergrid.services.notifications.gcm.NotificationsServiceIT;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,8 +51,7 @@ public abstract class AbstractServiceNotificationIT extends AbstractServiceIT {
throws Exception {
long timeout = System.currentTimeMillis() + 60000;
while (System.currentTimeMillis() < timeout) {
- Thread.sleep(200);
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex(200);
notification = app.getEntityManager().get(notification.getUuid(), Notification.class);
if (notification.getFinished() != null) {
return notification;
@@ -95,10 +93,9 @@ public abstract class AbstractServiceNotificationIT extends AbstractServiceIT {
}
}
- //assertEquals(expected, receipts.size());
- if( expected != receipts.size()){
- logger.warn("Expected receipt count {} does not match actual count {}", expected, receipts.size());
- }
+
+ assertEquals(expected, receipts.size());
+
for (EntityRef receipt : receipts) {
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java b/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
index 2a757ca..1c3bbcd 100644
--- a/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
@@ -117,7 +117,6 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
setup.getEntityIndex().refresh(app.getId());
listener = new QueueListener(ns.getServiceManagerFactory(),ns.getEntityManagerFactory(), new Properties());
- listener.DEFAULT_SLEEP = 200;
listener.start();
}
@@ -234,24 +233,24 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
);
- // verify Query for CREATED state
+ notificationWaitForComplete(notification);
+ app.waitForQueueDrainAndRefreshIndex(250);
+
+ // verify Query for FINISHED state and that the devices processed is 0
Query query = Query.fromEquals( "state", Notification.State.FINISHED.toString() );
Results results = app.getEntityManager().searchCollection(app.getEntityManager().getApplicationRef(), "notifications", query);
- Entity entity = results.getEntitiesMap().get(notification.getUuid());
- assertNotNull(entity);
-
- notificationWaitForComplete(notification);
+ notification = (Notification)results.getEntitiesMap().get(notification.getUuid()).toTypedEntity();
+ assertEquals(0, notification.getDeviceProcessedCount());
// perform push //
- //ns.getQueueManager().processBatchAndReschedule(notification, null);
notification = app.getEntityManager().get(e.getUuid(), Notification.class);
// verify Query for FINISHED state
query = Query.fromEquals("state", Notification.State.FINISHED.toString());
results = app.getEntityManager().searchCollection(app.getEntityManager().getApplicationRef(),
"notifications", query);
- entity = results.getEntitiesMap().get(notification.getUuid());
+ Entity entity = results.getEntitiesMap().get(notification.getUuid());
assertNotNull(entity);
notification = (Notification) entity.toTypedEntity();
@@ -501,7 +500,7 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
@Test
public void oneDeviceTwoNotifiers() throws Exception {
- // This test should configure 2 notifiers on a device and ensure that we can send to one of them
+ // This test should configure 2 notifiers on device1 and ensure that we can send to one of them
// create a 2nd notifier //
Object notifierName1 = "apNs2";
@@ -525,17 +524,19 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
assertEquals(notifier2.getProvider(), PROVIDER);
assertEquals(notifier2.getEnvironment(), environment1);
+
+ // Add a device token for the 2nd notifier
+ app.clear();
String key2 = notifier2.getName() + NOTIFIER_ID_POSTFIX;
- device1.setProperty(key2, PUSH_TOKEN);
- app.getEntityManager().update(device1);
- setup.getEntityIndex().refresh(app.getId()); // need to refresh the index after an update
+ app.put(key2, PUSH_TOKEN);
+ app.testRequest(ServiceAction.PUT, 1, "devices", device1).getEntity();
// create push notification //
app.clear();
String payload = getPayload();
Map<String, String> payloads = new HashMap<String, String>(1);
- payloads.put(notifier.getUuid().toString(), payload);
+ payloads.put(notifierName, payload);
app.put("payloads", payloads);
app.put("debug",true);
@@ -543,14 +544,14 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
"notifications").getEntity();
app.testRequest(ServiceAction.GET, 1, "notifications", notificationEntity.getUuid());
- Notification notification = app.getEntityManager().get(notificationEntity.getUuid(),
- Notification.class);
- assertEquals(
- notification.getPayloads().get(notifier.getUuid().toString()),
- payload);
+ Notification notification = app.getEntityManager().get(notificationEntity.getUuid(), Notification.class);
+ assertEquals(payload, notification.getPayloads().get(notifierName));
// perform push //
notification = notificationWaitForComplete(notification);
+
+ app.waitForQueueDrainAndRefreshIndex(2500);
+
checkReceipts(notification, 1);
}
@@ -692,6 +693,9 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
// perform push //
notification = notificationWaitForComplete(notification);
+
+ app.waitForQueueDrainAndRefreshIndex(250);
+
checkReceipts(notification, 2);
// Statistics are not accurate. See - https://issues.apache.org/jira/browse/USERGRID-1207
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java b/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java
index 1a9f4f7..8360009 100644
--- a/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java
@@ -62,13 +62,6 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
private NotificationsService ns;
private QueueListener listener;
-
- @BeforeClass
- public static void setup() {
-
-
- }
-
@Before
public void before() throws Exception {
@@ -103,7 +96,6 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
ns = getNotificationService();
listener = new QueueListener(ns.getServiceManagerFactory(), ns.getEntityManagerFactory(), new Properties());
- listener.DEFAULT_SLEEP = 200;
listener.start();
}
@@ -569,6 +561,9 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
// wait for notification to be marked finished
notification = notificationWaitForComplete(notification);
+ // receipts are created and queried, wait a bit longer for this to happen as indexing
+ app.waitForQueueDrainAndRefreshIndex(500);
+
// get the receipts entity IDs
List<EntityRef> receipts = getNotificationReceipts(notification);
assertEquals(1, receipts.size());
@@ -635,6 +630,10 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
notification = notificationWaitForComplete(notification);
app.testRequest(ServiceAction.GET, 1, "notifications", e.getUuid());
+
+ // receipts are created and queried, wait a bit longer for this to happen as indexing
+ app.waitForQueueDrainAndRefreshIndex(500);
+
// get the receipts entity IDs
List<EntityRef> receipts = getNotificationReceipts(notification);
assertEquals(1, receipts.size());
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/services/src/test/resources/usergrid-custom-test.properties
----------------------------------------------------------------------
diff --git a/stack/services/src/test/resources/usergrid-custom-test.properties b/stack/services/src/test/resources/usergrid-custom-test.properties
index 49f8b5d..bcc8b8e 100644
--- a/stack/services/src/test/resources/usergrid-custom-test.properties
+++ b/stack/services/src/test/resources/usergrid-custom-test.properties
@@ -38,13 +38,14 @@ elasticsearch.queue_impl.resolution=true
elasticsearch.queue_impl=DISTRIBUTED
# Queueing Test Settings
-# Reduce the long polling time for the tests
-queue.long.polling.time.millis=150
-queue.num.actors=5
-queue.sender.num.actors=5
-queue.writer.num.actors=5
-elasticsearch.worker_count=2
-usergrid.push.worker_count=2
+queue.long.polling.time.millis=50
+queue.num.actors=50
+queue.sender.num.actors=50
+queue.writer.num.actors=50
+queue.get.timeout.seconds=10
+elasticsearch.worker_count=8
+elasticsearch.worker_count_utility=8
+usergrid.push.worker_count=8
# This property is required to be set and cannot be defaulted anywhere
[3/3] usergrid git commit: Switch DISTRIBUTED database queueing to
default not cache in memory as the in memory impl causes duplicate messgae
processing quite often at the moment.
Posted by mr...@apache.org.
Switch DISTRIBUTED database queueing to default not cache in memory as the in memory impl causes duplicate messgae processing quite often at the moment.
- includes making all the tests work without in-memory queue fronting the database queue which really means adding some more delay in tests
- the tests now are actually faster now because the original refreshIndex() created and queried random entities which took longer in most cases
- uncommented the checkReceipts function in Notification tests as these are now passing, with an added fix for duplicate receipt creation
- some logging updates in the distributed database queueing impl (Qakka)
- increased the default take to 500 from the queue when DISTRIBUTED database queueing is configured ( which is the default now )
- Notifications Queue Listener thread names have a random identifier in included
- reduced the DISTRIBUTED database queueing default long poll to 1 second from 5 seconds
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/d3e988bc
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/d3e988bc
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/d3e988bc
Branch: refs/heads/master
Commit: d3e988bcbb7eb417c84cfda7396ec3506521aa37
Parents: 8b63aae
Author: Michael Russo <ru...@google.com>
Authored: Sun Apr 2 16:14:05 2017 -0700
Committer: Michael Russo <ru...@google.com>
Committed: Sun Apr 2 16:14:05 2017 -0700
----------------------------------------------------------------------
.../corepersistence/CpEntityManager.java | 44 +---------
.../asyncevents/AsyncIndexProvider.java | 5 ++
.../java/org/apache/usergrid/Application.java | 4 +-
.../org/apache/usergrid/CoreApplication.java | 22 ++---
.../org/apache/usergrid/CoreITSetupImpl.java | 20 ++++-
.../org/apache/usergrid/TestEntityIndex.java | 1 +
.../corepersistence/AggregationServiceTest.java | 15 ++--
.../corepersistence/StaleIndexCleanupTest.java | 17 ++--
.../persistence/ApplicationServiceIT.java | 6 +-
.../usergrid/persistence/CollectionIT.java | 74 ++++++++--------
.../usergrid/persistence/CountingMutatorIT.java | 4 +-
.../persistence/EntityConnectionsIT.java | 16 ++--
.../usergrid/persistence/EntityManagerIT.java | 20 ++---
.../org/apache/usergrid/persistence/GeoIT.java | 32 +++----
.../persistence/GeoQueryBooleanTest.java | 4 +-
.../apache/usergrid/persistence/IndexIT.java | 18 ++--
.../usergrid/persistence/PathQueryIT.java | 7 +-
.../usergrid/persistence/PermissionsIT.java | 6 +-
.../usergrid/persistence/RebuildIndexTest.java | 26 +++---
.../cassandra/EntityManagerFactoryImplIT.java | 13 +--
.../persistence/query/ConnectionHelper.java | 2 +-
.../query/IntersectionTransitivePagingIT.java | 2 +-
.../query/IntersectionUnionPagingIT.java | 2 +-
.../persistence/query/IteratingQueryIT.java | 32 +++----
.../persistence/query/NotSubPropertyIT.java | 2 +-
.../persistence/query/ParenthesisProblemIT.java | 2 +-
.../resources/usergrid-custom-test.properties | 3 +
.../usergrid/persistence/qakka/QakkaFig.java | 4 +-
.../qakka/core/impl/InMemoryQueue.java | 4 +-
.../core/impl/QueueMessageManagerImpl.java | 9 +-
.../distributed/actors/QueueActorHelper.java | 15 +++-
.../distributed/actors/QueueActorRouter.java | 2 +-
.../distributed/actors/ShardAllocator.java | 4 +-
.../impl/DistributedQueueServiceImpl.java | 26 ++++--
.../distributed/actors/ShardAllocatorTest.java | 3 +
.../queue/src/test/resources/qakka.properties | 5 +-
.../usergrid/rest/CollectionMetadataIT.java | 4 +-
.../apache/usergrid/rest/NotificationsIT.java | 14 ++-
.../apache/usergrid/rest/PartialUpdateTest.java | 6 +-
.../apache/usergrid/rest/SystemResourceIT.java | 3 +-
.../rest/applications/ApplicationCreateIT.java | 2 +-
.../rest/applications/ApplicationDeleteIT.java | 6 +-
.../applications/ApplicationResourceIT.java | 12 +--
.../applications/assets/AssetResourceIT.java | 26 +++---
.../applications/assets/AwsAssetResourceIT.java | 22 ++---
.../collection/BrowserCompatibilityTest.java | 2 +-
.../collection/CollectionsResourceIT.java | 70 +++++++--------
.../collection/DuplicateNameIT.java | 2 +-
.../activities/ActivityResourceIT.java | 8 +-
.../collection/activities/PutTest.java | 6 +-
.../collection/devices/DevicesResourceIT.java | 12 +--
.../collection/groups/GroupResourceIT.java | 39 +++++----
.../collection/paging/PagingResourceIT.java | 10 +--
.../users/ConnectionResourceTest.java | 22 ++---
.../collection/users/OwnershipResourceIT.java | 24 +++---
.../collection/users/PermissionsResourceIT.java | 58 ++++++-------
.../collection/users/RetrieveUsersTest.java | 8 +-
.../collection/users/UserResourceIT.java | 89 ++++++++++----------
.../applications/events/EventsResourceIT.java | 6 +-
.../applications/queries/BasicGeoTests.java | 4 +-
.../applications/queries/GeoPagingTest.java | 13 +--
.../applications/queries/MatrixQueryTests.java | 5 +-
.../rest/applications/queries/OrderByTest.java | 6 +-
.../applications/queries/QueryTestBase.java | 2 +-
.../queries/SelectMappingsQueryTest.java | 14 +--
.../usergrid/rest/management/AccessTokenIT.java | 8 +-
.../usergrid/rest/management/AdminUsersIT.java | 33 ++++----
.../rest/management/ExportResourceIT.java | 2 +-
.../rest/management/ImportResourceIT.java | 14 +--
.../rest/management/ManagementResourceIT.java | 18 ++--
.../rest/management/OrganizationsIT.java | 16 ++--
.../rest/management/RegistrationIT.java | 4 +-
.../rest/test/resource/AbstractRestIT.java | 12 ++-
.../resources/usergrid-custom-test.properties | 7 +-
.../services/notifications/QueueListener.java | 15 ++--
.../services/notifications/gcm/GCMAdapter.java | 3 +-
.../org/apache/usergrid/ServiceApplication.java | 2 +-
.../apache/usergrid/management/EmailFlowIT.java | 2 +-
.../org/apache/usergrid/management/RoleIT.java | 5 +-
.../usergrid/services/CollectionServiceIT.java | 2 +-
.../usergrid/services/GroupServiceIT.java | 5 +-
.../usergrid/services/ServiceInvocationIT.java | 6 +-
.../AbstractServiceNotificationIT.java | 11 +--
.../apns/NotificationsServiceIT.java | 40 +++++----
.../gcm/NotificationsServiceIT.java | 15 ++--
.../resources/usergrid-custom-test.properties | 15 ++--
86 files changed, 605 insertions(+), 596 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index 1071842..cdb4fc7 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -3089,52 +3089,16 @@ public class CpEntityManager implements EntityManager {
managerCache.getEntityIndex(applicationScope).addIndex(newIndexName, shards, replicas, writeConsistency);
}
+
@Override
public void initializeIndex(){
managerCache.getEntityIndex(applicationScope).initialize();
}
- /**
- * TODO, these 3 methods are super janky. During refactoring we should clean this model up
- */
- public EntityIndex.IndexRefreshCommandInfo refreshIndex() {
- try {
- long start = System.currentTimeMillis();
- // refresh special indexes without calling EntityManager refresh because stack overflow
- Map<String, Object> map = new org.apache.usergrid.persistence.index.utils.MapUtils.HashMapBuilder<>();
- map.put("some prop", "test");
- boolean hasFinished = false;
- Entity refreshEntity = create("refresh", map);
- EntityIndex.IndexRefreshCommandInfo indexRefreshCommandInfo
- = managerCache.getEntityIndex(applicationScope).refreshAsync().toBlocking().first();
- try {
- for (int i = 0; i < 20; i++) {
- if (searchCollection(
- new SimpleEntityRef(
- org.apache.usergrid.persistence.entities.Application.ENTITY_TYPE, getApplicationId()),
- InflectionUtils.pluralize("refresh"),
- Query.fromQL("select * where uuid='" + refreshEntity.getUuid() + "'")
- ).size() > 0
- ) {
- hasFinished = true;
- break;
- }
- int sleepTime = 500;
- logger.info("Sleeping {} ms during refreshIndex", sleepTime);
- Thread.sleep(sleepTime);
- indexRefreshCommandInfo
- = managerCache.getEntityIndex(applicationScope).refreshAsync().toBlocking().first();
- }
- if(!hasFinished){
- throw new RuntimeException("Did not find entity {} during refresh. uuid->"+refreshEntity.getUuid());
- }
- }finally {
- delete(refreshEntity);
- }
- Thread.sleep(100);
-
- return indexRefreshCommandInfo;
+ public EntityIndex.IndexRefreshCommandInfo refreshIndex() {
+ try {
+ return managerCache.getEntityIndex(applicationScope).refreshAsync().toBlocking().first();
} catch (Exception e) {
throw new RuntimeException("refresh failed",e);
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
index 81960f5..2ba6c0b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
@@ -36,6 +36,7 @@ import com.google.inject.Inject;
import com.google.inject.Provider;
import com.google.inject.Singleton;
+import static org.apache.usergrid.persistence.queue.LegacyQueueManager.Implementation.DISTRIBUTED;
import static org.apache.usergrid.persistence.queue.LegacyQueueManager.Implementation.LOCAL;
@@ -121,6 +122,10 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> {
asyncEventService.MAX_TAKE = 1000;
}
+ if ( impl.equals( DISTRIBUTED )) {
+ asyncEventService.MAX_TAKE = 500;
+ }
+
return asyncEventService;
}
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/Application.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/Application.java b/stack/core/src/test/java/org/apache/usergrid/Application.java
index 378a4f7..102ee9c 100644
--- a/stack/core/src/test/java/org/apache/usergrid/Application.java
+++ b/stack/core/src/test/java/org/apache/usergrid/Application.java
@@ -152,7 +152,9 @@ public interface Application extends TestRule {
public void remove( EntityRef entityRef ) throws Exception;
- public void refreshIndex();
+ public void waitForQueueDrainAndRefreshIndex(int waitTimeMillis);
+
+ public void waitForQueueDrainAndRefreshIndex();
/**
* Get the entity manager
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/CoreApplication.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/CoreApplication.java b/stack/core/src/test/java/org/apache/usergrid/CoreApplication.java
index 9046f02..f505ead 100644
--- a/stack/core/src/test/java/org/apache/usergrid/CoreApplication.java
+++ b/stack/core/src/test/java/org/apache/usergrid/CoreApplication.java
@@ -181,8 +181,8 @@ public class CoreApplication implements Application, TestRule {
logger.info( "Created new application {} in organization {}", appName, orgName );
-// //wait for the index before proceeding
-// em.refreshIndex();
+ //wait for the index before proceeding
+ waitForQueueDrainAndRefreshIndex(500);
}
@@ -223,19 +223,21 @@ public class CoreApplication implements Application, TestRule {
return em.get( new SimpleEntityRef( type, id ) );
}
-
@Override
- public synchronized void refreshIndex() {
- //Insert test entity and find it
- setup.getEmf().refreshIndex(CpNamingUtils.getManagementApplicationId().getUuid());
-
- if (!em.getApplicationId().equals(CpNamingUtils.getManagementApplicationId().getUuid())) {
- setup.getEmf().refreshIndex(em.getApplicationId());
+ public synchronized void waitForQueueDrainAndRefreshIndex(int waitTimeMillis) {
+ try{
+ Thread.sleep(waitTimeMillis);
+ } catch (InterruptedException e ){
+ //noop
}
-
em.refreshIndex();
}
+ @Override
+ public synchronized void waitForQueueDrainAndRefreshIndex() {
+ waitForQueueDrainAndRefreshIndex(750);
+ }
+
@Override
public EntityManager getEntityManager() {
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/CoreITSetupImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/CoreITSetupImpl.java b/stack/core/src/test/java/org/apache/usergrid/CoreITSetupImpl.java
index 64b001c..bd6ae3e 100644
--- a/stack/core/src/test/java/org/apache/usergrid/CoreITSetupImpl.java
+++ b/stack/core/src/test/java/org/apache/usergrid/CoreITSetupImpl.java
@@ -156,16 +156,28 @@ public class CoreITSetupImpl implements CoreITSetup, TestEntityIndex {
@Override
public void refresh(UUID appId){
try {
- Thread.sleep(50);
- } catch (InterruptedException ie){
+ Thread.sleep(125);
+
+ } catch (InterruptedException ie){
+ //noop
}
+
emf.refreshIndex(appId);
+ }
+
+ @Override
+ public void waitForQueueDrainAndRefresh(UUID appId, int waitTimeMillis){
try {
- Thread.sleep(50);
- } catch (InterruptedException ie){
+ Thread.sleep(waitTimeMillis);
+
+ } catch (InterruptedException ie){
+ //noop
}
+
+ emf.refreshIndex(appId);
+
}
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/TestEntityIndex.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/TestEntityIndex.java b/stack/core/src/test/java/org/apache/usergrid/TestEntityIndex.java
index 7da187a..e5e979e 100644
--- a/stack/core/src/test/java/org/apache/usergrid/TestEntityIndex.java
+++ b/stack/core/src/test/java/org/apache/usergrid/TestEntityIndex.java
@@ -26,4 +26,5 @@ import java.util.UUID;
*/
public interface TestEntityIndex {
void refresh(UUID appId);
+ void waitForQueueDrainAndRefresh(UUID appId, int waitTimeMillis);
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/corepersistence/AggregationServiceTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/AggregationServiceTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/AggregationServiceTest.java
index 9f1c9a4..55ce26e 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/AggregationServiceTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/AggregationServiceTest.java
@@ -48,8 +48,8 @@ public class AggregationServiceTest extends AbstractCoreIT {
props.put("name", "myname");
Entity entity1 = this.app.getEntityManager().create("test", props);
Entity entity2 = this.app.getEntityManager().create("test2", props);
- this.app.refreshIndex();
- Thread.sleep(500);
+
+ this.app.waitForQueueDrainAndRefreshIndex(500);
long sum = aggregationService.getApplicationSize(applicationScope);
@@ -57,23 +57,24 @@ public class AggregationServiceTest extends AbstractCoreIT {
Assert.assertTrue(sum > (entity1.getSize() + entity2.getSize()));
long sum1 = aggregationService.getSize(applicationScope, CpNamingUtils.createCollectionSearchEdge(applicationScope.getApplication(), "tests"));
- Assert.assertEquals(sum1, entity1.getSize());
+ Assert.assertEquals(entity1.getSize(), sum1);
long sum2 = aggregationService.getSize(applicationScope, CpNamingUtils.createCollectionSearchEdge(applicationScope.getApplication(), "test2s"));
- Assert.assertEquals(sum2, entity2.getSize());
+ Assert.assertEquals(entity2.getSize(), sum2);
props = new HashMap<>();
props.put("test", 1234);
props.put("name", "myname2");
Entity entity3 = this.app.getEntityManager().create("test", props);
- this.app.refreshIndex();
+ this.app.waitForQueueDrainAndRefreshIndex(500);
+
long sum3 = aggregationService.getSize(applicationScope, CpNamingUtils.createCollectionSearchEdge(applicationScope.getApplication(), "tests"));
- Assert.assertEquals(sum3, entity1.getSize() + entity3.getSize());
+ Assert.assertEquals(entity1.getSize() + entity3.getSize(), sum3);
Map<String,Long> sumEach = aggregationService.getEachCollectionSize(applicationScope);
Assert.assertTrue(sumEach.containsKey("tests") && sumEach.containsKey("test2s"));
- Assert.assertEquals(sum3, (long) sumEach.get("tests"));
+ Assert.assertEquals((long) sumEach.get("tests"), sum3);
}
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
index abe2615..0abd7a2 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
@@ -20,7 +20,6 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.UUID;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -105,7 +104,7 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
Entity thing = em.create("thing", new HashMap<String, Object>() {{
put("name", "thing1");
}});
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
Thread.sleep(1000);
assertEquals(1, queryCollectionCp("things", "thing", "select *").size());
@@ -116,7 +115,7 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
em.updateProperties(thing, new HashMap<String, Object>() {{
put("stuff", "widget");
}});
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
Thread.sleep(1000);
org.apache.usergrid.persistence.model.entity.Entity cpUpdated = getCpEntity(thing);
@@ -161,7 +160,7 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
}}));
Thread.sleep( writeDelayMs );
}
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
CandidateResults crs = queryCollectionCp( "things", "thing", "select *");
Assert.assertEquals( "Expect no stale candidates yet", numEntities, crs.size() );
@@ -210,7 +209,7 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
Thread.sleep(250); // delete happens asynchronously, wait for some time
//refresh the app index
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
Thread.sleep(250); // refresh happens asynchronously, wait for some time
@@ -231,7 +230,7 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
}
});
//refresh the app index
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
crs = queryCollectionCp("things", "thing", "select *");
@@ -265,7 +264,7 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
put("name", dogName);
}}));
}
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
CandidateResults crs = queryCollectionCp( "dogs", "dog", "select *");
Assert.assertEquals("Expect no stale candidates yet", numEntities, crs.size());
@@ -288,7 +287,7 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
}
}
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
// wait for indexes to be cleared for the deleted entities
count = 0;
@@ -296,7 +295,7 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
do {
//trigger the repair
queryCollectionEm("dogs", "select * order by created");
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
crs = queryCollectionCp("dogs", "dog", "select *");
} while ( crs.size() != numEntities && count++ < 15 );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/persistence/ApplicationServiceIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/ApplicationServiceIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/ApplicationServiceIT.java
index 9ad90eb..547691f 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/ApplicationServiceIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/ApplicationServiceIT.java
@@ -23,7 +23,6 @@ import com.google.common.base.Optional;
import com.google.inject.Injector;
import org.apache.usergrid.AbstractCoreIT;
import org.apache.usergrid.cassandra.SpringResource;
-import org.apache.usergrid.corepersistence.service.AggregationService;
import org.apache.usergrid.corepersistence.service.AggregationServiceFactory;
import org.apache.usergrid.corepersistence.util.CpNamingUtils;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
@@ -32,7 +31,6 @@ import org.apache.usergrid.persistence.graph.GraphManager;
import org.apache.usergrid.persistence.graph.GraphManagerFactory;
import org.apache.usergrid.persistence.graph.MarkedEdge;
import org.apache.usergrid.persistence.graph.SearchByEdgeType;
-import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge;
import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
import org.apache.usergrid.persistence.model.entity.Id;
import org.junit.Assert;
@@ -65,7 +63,7 @@ public class ApplicationServiceIT extends AbstractCoreIT {
map.put("somekey", UUID.randomUUID());
Entity entity = entityManager.create("tests", map);
}
- this.app.refreshIndex();
+ this.app.waitForQueueDrainAndRefreshIndex();
Thread.sleep(500);
ApplicationScope appScope = CpNamingUtils.getApplicationScope(entityManager.getApplicationId());
Observable<Id> ids =
@@ -76,7 +74,7 @@ public class ApplicationServiceIT extends AbstractCoreIT {
this.app.getApplicationService().deleteAllEntities(appScope, 5);
count = ids.count().toBlocking().last();
Assert.assertEquals(count, 5);
- this.app.refreshIndex();
+ this.app.waitForQueueDrainAndRefreshIndex();
Injector injector = SpringResource.getInstance().getBean(Injector.class);
GraphManagerFactory factory = injector.getInstance(GraphManagerFactory.class);
GraphManager graphManager = factory.createEdgeManager(appScope);
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java
index 3305e0e..f484f4f 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java
@@ -132,7 +132,7 @@ public class CollectionIT extends AbstractCoreIT {
activity3 = app.get( activity3.getUuid(), activity3.getType() );
app.addToCollection( user, "activities", activity3 );
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
// empty query
Query query = new Query();
@@ -259,7 +259,7 @@ public class CollectionIT extends AbstractCoreIT {
activity3 = app.get(activity3.getUuid(), activity3.getType());
app.addToCollection(user, "activities", activity3);
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
// empty query
Query query = new Query();
@@ -295,7 +295,7 @@ public class CollectionIT extends AbstractCoreIT {
Entity user = em.create( "user", properties );
assertNotNull( user );
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
// EntityRef
Query query = Query.fromQL( "firstname = '" + firstName + "'" );
@@ -315,7 +315,7 @@ public class CollectionIT extends AbstractCoreIT {
em.update( user );
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
// search with the old username, should be no results
query = Query.fromQL( "firstname = '" + firstName + "'" );
@@ -354,7 +354,7 @@ public class CollectionIT extends AbstractCoreIT {
Entity user = em.create( "user", properties );
assertNotNull( user );
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
// EntityRef
final Query query = Query.fromQL( "middlename = '" + middleName + "'" );
@@ -386,7 +386,7 @@ public class CollectionIT extends AbstractCoreIT {
Entity user = em.create( "user", properties );
assertNotNull( user );
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
// EntityRef
final Query query = Query.fromQL( "lastname = '" + lastName + "'" );
@@ -434,7 +434,7 @@ public class CollectionIT extends AbstractCoreIT {
properties.put("nickname", "ed");
em.updateProperties(user1, properties);
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
Thread.sleep(1000);
final Query query = Query.fromQL( "nickname = 'ed'" );
@@ -469,7 +469,7 @@ public class CollectionIT extends AbstractCoreIT {
Entity group = em.create( "group", properties );
assertNotNull( group );
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
// EntityRef
final Query query = Query.fromQL( "name = '" + groupName + "'" );
@@ -501,7 +501,7 @@ public class CollectionIT extends AbstractCoreIT {
Entity group = em.create( "group", properties );
assertNotNull( group );
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
// EntityRef
@@ -559,7 +559,7 @@ public class CollectionIT extends AbstractCoreIT {
em.addToCollection( user, "activities", em.create( "activity", properties ) );
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
final Query query = Query.fromQL( "verb = 'post'" );
@@ -593,7 +593,7 @@ public class CollectionIT extends AbstractCoreIT {
Entity user2 = em.create( "user", properties );
assertNotNull( user2 );
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
// EntityRef
Query query = new Query();
@@ -636,7 +636,7 @@ public class CollectionIT extends AbstractCoreIT {
Entity user2 = em.create( "user", properties );
assertNotNull( user2 );
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
// EntityRef
Query query = new Query();
@@ -677,7 +677,7 @@ public class CollectionIT extends AbstractCoreIT {
Entity game2 = em.create( "orquerygame", properties );
assertNotNull( game2 );
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
// EntityRef
Query query = Query
@@ -756,7 +756,7 @@ public class CollectionIT extends AbstractCoreIT {
Entity game2 = em.create( "game", properties );
assertNotNull( game2 );
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
// overlap
Query query = Query.fromQL(
@@ -817,7 +817,7 @@ public class CollectionIT extends AbstractCoreIT {
Entity game2 = em.create( "game", properties );
assertNotNull( game2 );
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
// simple not
Query query = Query.fromQL( "select * where NOT keywords contains 'game'" );
@@ -893,7 +893,7 @@ public class CollectionIT extends AbstractCoreIT {
Entity entity2 = em.create( "game", properties );
assertNotNull( entity2 );
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
// search for games without sub-field Foo should returned zero entities
@@ -949,7 +949,7 @@ public class CollectionIT extends AbstractCoreIT {
properties.put("keywords", "Action, New");
em.create( "game", properties );
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
Query query = Query.fromQL( "select * where keywords contains 'hot' or title contains 'hot'" );
Results r = em.searchCollection( em.getApplicationRef(), "games", query );
@@ -980,7 +980,7 @@ public class CollectionIT extends AbstractCoreIT {
properties.put( "keywords", "Action, New" );
Entity thirdGame = em.create( "game", properties );
- app.refreshIndex();//need to track all batches then resolve promises
+ app.waitForQueueDrainAndRefreshIndex();//need to track all batches then resolve promises
Query query = Query.fromQL( "select * where keywords contains 'new' and title contains 'extreme'" );
Results r = em.searchCollection( em.getApplicationRef(), "games", query );
@@ -1011,7 +1011,7 @@ public class CollectionIT extends AbstractCoreIT {
entityIds.add( created.getUuid() );
}
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
Query query = new Query();
query.setLimit( 50 );
@@ -1039,7 +1039,7 @@ public class CollectionIT extends AbstractCoreIT {
numDeleted++;
}
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
// wait for indexes to be cleared
Thread.sleep(1000); //TODO find why we have to wait. This is a bug
@@ -1096,7 +1096,7 @@ public class CollectionIT extends AbstractCoreIT {
int pageSize = 10;
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
final Query query = Query.fromQL( "index < " + size * 2 + " order by index asc" );
Results r = null;
@@ -1147,7 +1147,7 @@ public class CollectionIT extends AbstractCoreIT {
int pageSize = 10;
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
Query query = Query.fromQL( "select * where index >= " + size / 2 + " sort by index asc" );
query.setLimit( pageSize );
@@ -1201,7 +1201,7 @@ public class CollectionIT extends AbstractCoreIT {
entityIds.add( created.getUuid() );
}
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
int pageSize = 10;
@@ -1254,7 +1254,7 @@ public class CollectionIT extends AbstractCoreIT {
entityIds.add( created.getUuid() );
}
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
int pageSize = 5;
@@ -1310,7 +1310,7 @@ public class CollectionIT extends AbstractCoreIT {
Entity saved = em.create( "test", root );
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
Query query = Query.fromQL( "rootprop1 = 'simpleprop'" );
Entity entity;
@@ -1357,7 +1357,7 @@ public class CollectionIT extends AbstractCoreIT {
Entity saved = em.create( "test", jsonData );
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
Query query = Query.fromQL( "intprop = 10" );
@@ -1416,7 +1416,7 @@ public class CollectionIT extends AbstractCoreIT {
Entity saved = em.create( "test", props );
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
Query query = Query.fromQL( "myString = 'My simple string'" );
@@ -1441,7 +1441,7 @@ public class CollectionIT extends AbstractCoreIT {
em.create( "user", properties );
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
String s = "select username, email where username = 'edanuff'";
Query query = Query.fromQL( s );
@@ -1471,7 +1471,7 @@ public class CollectionIT extends AbstractCoreIT {
em.create( "user", properties );
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
String s = "select {name: username, email: email} where username = 'edanuff'";
Query query = Query.fromQL( s );
@@ -1503,7 +1503,7 @@ public class CollectionIT extends AbstractCoreIT {
final Entity entity = em.create( "user", properties );
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
String s = "select * where username = 'ed@anuff.com'";
Query query = Query.fromQL( s );
@@ -1525,7 +1525,7 @@ public class CollectionIT extends AbstractCoreIT {
em.createConnection( foo, "testconnection", entity );
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
// now query via the testConnection, this should work
@@ -1569,7 +1569,7 @@ public class CollectionIT extends AbstractCoreIT {
em.create( "loveobject", properties );
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
location = new LinkedHashMap<String, Object>();
location.put( "Place", "Via Pietro Maroncelli, 48, 62012 Santa Maria Apparente Province of Macerata, Italy" );
@@ -1587,7 +1587,7 @@ public class CollectionIT extends AbstractCoreIT {
em.create( "loveobject", properties );
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
// String s = "select * where Flag = 'requested'";
// String s = "select * where Flag = 'requested' and NOT Recipient.Username =
@@ -1632,7 +1632,7 @@ public class CollectionIT extends AbstractCoreIT {
createdEntities.add( created );
}
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
Results r = em.getCollection( em.getApplicationRef(), "users", null, 50, Level.ALL_PROPERTIES, false );
@@ -1729,7 +1729,7 @@ public class CollectionIT extends AbstractCoreIT {
Entity game2 = em.create( "game", properties );
assertNotNull( game2 );
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
// overlap
Query query = new Query();
@@ -1763,7 +1763,7 @@ public class CollectionIT extends AbstractCoreIT {
Entity game2 = em.create( "game", properties );
assertNotNull( game2 );
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
// overlap
Query query = new Query();
@@ -1797,7 +1797,7 @@ public class CollectionIT extends AbstractCoreIT {
Entity createUser2 = em.create( user2 );
assertNotNull( createUser2 );
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
// overlap
Query query = new Query();
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/persistence/CountingMutatorIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/CountingMutatorIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/CountingMutatorIT.java
index 63c7cb8..596ec7c 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/CountingMutatorIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/CountingMutatorIT.java
@@ -74,7 +74,7 @@ public class CountingMutatorIT extends AbstractCoreIT {
properties.put( "username", "testuser" );
properties.put( "email", "test@foo.bar" );
Entity created = em.create( "user", properties );
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
Entity returned = em.get( created.getUuid() );
@@ -89,7 +89,7 @@ public class CountingMutatorIT extends AbstractCoreIT {
Entity connectedEntity = em.create( "user", connectedProps );
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
// Connect from our new entity to our root one so it's updated when paging
em.createConnection( connectedEntity, "following", returned );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/persistence/EntityConnectionsIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/EntityConnectionsIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/EntityConnectionsIT.java
index 296bf53..e1e24c4 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/EntityConnectionsIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/EntityConnectionsIT.java
@@ -64,7 +64,7 @@ public class EntityConnectionsIT extends AbstractCoreIT {
assertEquals( 1, connectionTypes.size());
assertEquals("likes", connectionTypes.iterator().next());
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
Results r = em.getTargetEntities(firstUserEntity, "likes", null, Level.IDS);
@@ -128,7 +128,7 @@ public class EntityConnectionsIT extends AbstractCoreIT {
logger.info( "\n\nConnecting " + awardA.getUuid() + " \"awarded\" " + catB.getUuid() + "\n" );
em.createConnection( awardA, "awarded", catB );
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
// List forward connections for cat A
@@ -149,7 +149,7 @@ public class EntityConnectionsIT extends AbstractCoreIT {
logger.info( "\n\nConnecting " + awardA.getUuid() + " \"awarded\" " + catA.getUuid() + "\n" );
em.createConnection( awardA, "awarded", catA );
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
// List forward connections for cat A
// Not valid with current usages
@@ -256,7 +256,7 @@ public class EntityConnectionsIT extends AbstractCoreIT {
em.createConnection( secondUserEntity, "likes", arrogantbutcher );
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
Results r = em.getTargetEntities(firstUserEntity, "likes", "restaurant", Level.IDS);
@@ -310,7 +310,7 @@ public class EntityConnectionsIT extends AbstractCoreIT {
em.createConnection( fredEntity, "likes", wilmaEntity );
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
// // search for "likes" edges from fred
// assertEquals( 1,
@@ -363,7 +363,7 @@ public class EntityConnectionsIT extends AbstractCoreIT {
em.createConnection( fredEntity, "likes", JohnEntity );
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
// now query via the testConnection, this should work
@@ -410,7 +410,7 @@ public class EntityConnectionsIT extends AbstractCoreIT {
}
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
Results r = em.getTargetEntities(firstUserEntity, "likes", null, Level.ALL_PROPERTIES) ;
@@ -453,7 +453,7 @@ public class EntityConnectionsIT extends AbstractCoreIT {
//
// em.createConnection( fredEntity, "likes", wilmaEntity );
//
-// app.refreshIndex();
+// app.waitForQueueDrainAndRefreshIndex();
//
//// // search for "likes" edges from fred
//// assertEquals( 1,
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/persistence/EntityManagerIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/EntityManagerIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/EntityManagerIT.java
index cb3a728..e1e4a05 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/EntityManagerIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/EntityManagerIT.java
@@ -75,7 +75,7 @@ public class EntityManagerIT extends AbstractCoreIT {
assertEquals( "user.username not expected value", "edanuff", user.getProperty( "username" ) );
assertEquals( "user.email not expected value", "ed@anuff.com", user.getProperty( "email" ) );
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
EntityRef userRef = em.getAlias( new SimpleEntityRef( "application", applicationId ), "users", "edanuff" );
@@ -274,13 +274,13 @@ public class EntityManagerIT extends AbstractCoreIT {
Entity thing = em.create( "thing", properties );
logger.info( "Entity created" );
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
logger.info( "Starting entity delete" );
em.delete( thing );
logger.info( "Entity deleted" );
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
// now search by username, no results should be returned
@@ -310,13 +310,13 @@ public class EntityManagerIT extends AbstractCoreIT {
Entity user = em.create( "user", properties );
logger.info( "Entity created" );
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
logger.info( "Starting entity delete" );
em.delete( user );
logger.info( "Entity deleted" );
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
// now search by username, no results should be returned
@@ -335,7 +335,7 @@ public class EntityManagerIT extends AbstractCoreIT {
user = em.create( "user", properties );
logger.info( "Entity created" );
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
final Query userNameQuery = Query.fromQL( "username = '" + name + "'" );
@@ -456,7 +456,7 @@ public class EntityManagerIT extends AbstractCoreIT {
EntityRef appRef = em.get( new SimpleEntityRef( "application", app.getId() ) );
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
Results r = em.getCollection( appRef, "things", null, 10, Level.ALL_PROPERTIES, false );
@@ -548,7 +548,7 @@ public class EntityManagerIT extends AbstractCoreIT {
Entity createdDevice = em.createItemInCollection( createdUser, "devices", "device", device );
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
Entity returnedDevice = em.get( new SimpleEntityRef( "device", createdDevice.getUuid() ) );
@@ -580,7 +580,7 @@ public class EntityManagerIT extends AbstractCoreIT {
Entity user = em.create( "robot", properties );
assertNotNull( user );
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
assertNotNull( em.get( user.getUuid() ) );
}
@@ -608,7 +608,7 @@ public class EntityManagerIT extends AbstractCoreIT {
em.addToCollection(appRef, "fluffies", entityRef);
em.addToCollection(appRef, "fluffies", entityRef);
- //app.refreshIndex();
+ //app.waitForQueueDrainAndRefreshIndex();
Results results = em.getCollection(appRef,
"fluffies", null, 10, Level.ALL_PROPERTIES, true);
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/persistence/GeoIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/GeoIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/GeoIT.java
index b7d708e..df77084 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/GeoIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/GeoIT.java
@@ -96,7 +96,7 @@ public class GeoIT extends AbstractCoreIT {
assertNotNull(hotel);
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
//2. Query with a globally large distance to verify location
@@ -142,7 +142,7 @@ public class GeoIT extends AbstractCoreIT {
}};
Entity user = em.create("user", properties);
assertNotNull(user);
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
//2. Query with a globally large distance to verify location
Query query = Query.fromQL("select * where location within " + Integer.MAX_VALUE + " of 0, 0");
@@ -154,7 +154,7 @@ public class GeoIT extends AbstractCoreIT {
user.getDynamicProperties().remove("location");
em.updateProperties(user, properties);
em.update(user);
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
//4. Repeat the query, expecting no results
listResults = em.searchCollection(em.getApplicationRef(), "users", query);
@@ -188,7 +188,7 @@ public class GeoIT extends AbstractCoreIT {
}};
Entity user = em.create("user", properties);
assertNotNull(user);
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
final double lat = 37.776753;
final double lon = -122.407846;
@@ -234,7 +234,7 @@ public class GeoIT extends AbstractCoreIT {
}};
Entity user = em.create("user", properties);
assertNotNull(user);
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
final double lat = 37.776753;
final double lon = -122.407846;
@@ -284,12 +284,12 @@ public class GeoIT extends AbstractCoreIT {
Entity user = em.create("user", userProperties);
assertNotNull(user);
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
//3. Create a connection between the user and the entity
em.createConnection(user, "likes", restaurant);
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
//4. Test that the user is within 2000m of the entity
Results emSearchResults = em.searchTargetEntities(user,
Query.fromQL("location within 5000 of "
@@ -326,7 +326,7 @@ public class GeoIT extends AbstractCoreIT {
assertNotNull(entity);
logger.debug("Entity {} created", entity.getProperty("name"));
}
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
//2. validate the size of the result
Query query = new Query();
Results listResults = em.searchCollection(em.getApplicationRef(), "stores", query);
@@ -367,7 +367,7 @@ public class GeoIT extends AbstractCoreIT {
assertNotNull(entity);
logger.debug("Entity {} created", entity.getProperty("name"));
}
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
//2. validate the size of the result
Query query = new Query();
Results listResults = em.searchCollection(em.getApplicationRef(), "stores", query);
@@ -540,7 +540,7 @@ public class GeoIT extends AbstractCoreIT {
em.create("store", data);
}
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
// earth's circumference is 40,075 kilometers. Up it to 50,000kilometers
// just to be save
@@ -596,7 +596,7 @@ public class GeoIT extends AbstractCoreIT {
em.create("store", data);
}
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
Thread.sleep(2000);
// earth's circumference is 40,075 kilometers. Up it to 50,000kilometers
@@ -669,7 +669,7 @@ public class GeoIT extends AbstractCoreIT {
em.create("store", data);
}
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
// earth's circumference is 40,075 kilometers. Up it to 50,000kilometers
// just to be save
@@ -729,7 +729,7 @@ public class GeoIT extends AbstractCoreIT {
created.add(e);
}
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
int startDelta = size - min;
@@ -794,7 +794,7 @@ public class GeoIT extends AbstractCoreIT {
em.create("store", data);
}
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
//do a direct geo iterator test. We need to make sure that we short circuit on the correct tile.
@@ -838,7 +838,7 @@ public class GeoIT extends AbstractCoreIT {
assertNotNull(entity);
}
//3. refresh the index
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
//4. return the entity manager
return em;
}
@@ -857,7 +857,7 @@ public class GeoIT extends AbstractCoreIT {
latlong.put("longitude", longitude);
em.setProperty(entity, "location", latlong);
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/persistence/GeoQueryBooleanTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/GeoQueryBooleanTest.java b/stack/core/src/test/java/org/apache/usergrid/persistence/GeoQueryBooleanTest.java
index 9a3f5a6..609f977 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/GeoQueryBooleanTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/GeoQueryBooleanTest.java
@@ -79,7 +79,7 @@ public class GeoQueryBooleanTest extends AbstractCoreIT {
Entity user2 = em.create( "user", properties );
assertNotNull( user2 );
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
// define center point about 300m from that location
final double lat = 37.774277;
@@ -158,7 +158,7 @@ public class GeoQueryBooleanTest extends AbstractCoreIT {
Entity userFred = em.create( "user", properties );
assertNotNull( userFred );
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
// define center point about 300m from that location
final double lat = 37.774277;
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/persistence/IndexIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/IndexIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/IndexIT.java
index d62f88e..5933b57 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/IndexIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/IndexIT.java
@@ -60,7 +60,7 @@ public class IndexIT extends AbstractCoreIT {
em.create( "item", properties );
}
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
int i = 0;
@@ -133,7 +133,7 @@ public class IndexIT extends AbstractCoreIT {
em.create( "item", properties );
}
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
Query query = Query.fromQL( "name < 'delta' order by name asc" );
Results r = em.searchCollection( em.getApplicationRef(), "items", query );
@@ -261,7 +261,7 @@ public class IndexIT extends AbstractCoreIT {
em.create( "item", properties );
}
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
Query query = Query.fromQL( "group = 1 order by name desc" );
Results r = em.searchCollection( em.getApplicationRef(), "items", query );
@@ -290,7 +290,7 @@ public class IndexIT extends AbstractCoreIT {
em.create("names", entity1);
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
//should return valid values
Query query = Query.fromQL("select status where status = 'pickled'");
@@ -338,7 +338,7 @@ public class IndexIT extends AbstractCoreIT {
em.createConnection( entity2Ref, "connecting", entity1Ref );
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
//should return valid values
Query query = Query.fromQL( "select * where status = 'pickled'" );
@@ -357,7 +357,7 @@ public class IndexIT extends AbstractCoreIT {
em.update( entity1Ref );
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
//query and check the status has been updated, shouldn't return results
query = Query.fromQL( "select * where status = 'pickled'" );
@@ -413,7 +413,7 @@ public class IndexIT extends AbstractCoreIT {
em.createConnection( entity2Ref, "connecting", entity1Ref );
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
//should return valid values
Query query = Query.fromQL( "select * where status = 'pickled'" );
@@ -432,7 +432,7 @@ public class IndexIT extends AbstractCoreIT {
em.update( entity1Ref );
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
//query and check the status has been updated, shouldn't return results
query = Query.fromQL( "select * where status = 'pickled'" );
@@ -500,7 +500,7 @@ public class IndexIT extends AbstractCoreIT {
}};
em.create("names", entity2);
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
// simple single-field select mapping
{
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/persistence/PathQueryIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/PathQueryIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/PathQueryIT.java
index e6ecf97..329a5be 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/PathQueryIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/PathQueryIT.java
@@ -28,7 +28,6 @@ import java.util.UUID;
import org.junit.Test;
import org.apache.usergrid.AbstractCoreIT;
-import org.apache.usergrid.persistence.Query;
import org.apache.usergrid.persistence.Query.Level;
import org.apache.usergrid.persistence.model.util.UUIDGenerator;
@@ -63,7 +62,7 @@ public class PathQueryIT extends AbstractCoreIT {
}
}
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
Thread.sleep(1000);
@@ -135,7 +134,7 @@ public class PathQueryIT extends AbstractCoreIT {
}
}
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
// pick an arbitrary group, ensure it has 7 users
Results ru = em.getCollection( groups.get( 2 ), "users", null, 20, Level.IDS, false );
@@ -152,7 +151,7 @@ public class PathQueryIT extends AbstractCoreIT {
}
}
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
// pick an arbitrary user, ensure it has 7 devices
Results rd = em.getCollection( users.get( 6 ), "devices", null, 20, Level.IDS, false );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/persistence/PermissionsIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/PermissionsIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/PermissionsIT.java
index 11f0692..1072d29 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/PermissionsIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/PermissionsIT.java
@@ -28,8 +28,6 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.commons.lang3.RandomStringUtils;
-
import org.apache.usergrid.AbstractCoreIT;
import org.apache.usergrid.persistence.entities.Role;
import org.apache.usergrid.persistence.Query.Level;
@@ -147,7 +145,7 @@ public class PermissionsIT extends AbstractCoreIT {
dump( "group roles", roles );
em.deleteGroupRole( group.getUuid(), "author" );
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
Thread.sleep(1000);
roles = em.getGroupRoles( group.getUuid() );
@@ -156,7 +154,7 @@ public class PermissionsIT extends AbstractCoreIT {
em.addUserToGroupRole( user.getUuid(), group.getUuid(), "admin" );
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
Results r = em.getUsersInGroupRole( group.getUuid(), "admin", Level.ALL_PROPERTIES );
dump( "entities", r.getEntities() );
assertEquals( "proper number of users in group role not set", 1, r.size() );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/persistence/RebuildIndexTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/RebuildIndexTest.java b/stack/core/src/test/java/org/apache/usergrid/persistence/RebuildIndexTest.java
index 383d620..a7759de 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/RebuildIndexTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/RebuildIndexTest.java
@@ -125,7 +125,7 @@ public class RebuildIndexTest extends AbstractCoreIT {
}
logger.info( "Created {} entities", ENTITIES_TO_INDEX );
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex(5000);
// ----------------- test that we can read them, should work fine
@@ -163,6 +163,7 @@ public class RebuildIndexTest extends AbstractCoreIT {
waitForRebuild( status, reIndexService );
+ app.waitForQueueDrainAndRefreshIndex(5000);
// ----------------- test that we can read the catherder collection and not the catshepard
@@ -233,7 +234,7 @@ public class RebuildIndexTest extends AbstractCoreIT {
}
logger.info( "Created {} entities", ENTITIES_TO_INDEX );
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex(15000);
// ----------------- test that we can read them, should work fine
@@ -247,6 +248,8 @@ public class RebuildIndexTest extends AbstractCoreIT {
deleteIndex( em.getApplicationId() );
+ app.waitForQueueDrainAndRefreshIndex();
+
// ----------------- test that we can read them, should fail
// deleting sytem app index will interfere with other concurrently running tests
@@ -283,7 +286,6 @@ public class RebuildIndexTest extends AbstractCoreIT {
logger.info( "Rebuilt index" );
- app.refreshIndex();
}
catch ( Exception ex ) {
logger.error( "Error rebuilding index", ex );
@@ -292,7 +294,7 @@ public class RebuildIndexTest extends AbstractCoreIT {
// ----------------- test that we can read them
- Thread.sleep( 2000 );
+ app.waitForQueueDrainAndRefreshIndex(15000);
readData( em, collectionName, ENTITIES_TO_INDEX, 3 );
}
@@ -343,7 +345,7 @@ public class RebuildIndexTest extends AbstractCoreIT {
logger.info( "Created {} entities", ENTITIES_TO_INDEX );
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex(5000);
// ----------------- test that we can read them, should work fine
@@ -392,7 +394,6 @@ public class RebuildIndexTest extends AbstractCoreIT {
logger.info( "Rebuilt index" );
- app.refreshIndex();
}
catch ( Exception ex ) {
logger.error( "Error rebuilding index", ex );
@@ -401,7 +402,7 @@ public class RebuildIndexTest extends AbstractCoreIT {
// ----------------- test that we can read them
- Thread.sleep( 2000 );
+ app.waitForQueueDrainAndRefreshIndex(5000);
results = em.searchCollectionConsistent( em.getApplicationRef(), collectionName, q, 3 );
assertEquals(results.size(),3);
q = Query.fromQL("select * where location within 100 of "+lat+", "+lon);
@@ -435,7 +436,7 @@ public class RebuildIndexTest extends AbstractCoreIT {
final Entity secondEntity = em.create( "thing", entityData);
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex(5000);
// ----------------- test that we can read them, should work fine
@@ -493,7 +494,6 @@ public class RebuildIndexTest extends AbstractCoreIT {
logger.info( "Rebuilt index" );
- app.refreshIndex();
}
catch ( Exception ex ) {
logger.error( "Error rebuilding index", ex );
@@ -502,7 +502,7 @@ public class RebuildIndexTest extends AbstractCoreIT {
// ----------------- test that we can read them
- Thread.sleep( 2000 );
+ app.waitForQueueDrainAndRefreshIndex(5000);
countEntities( em, collectionName, 1 );
}
@@ -547,14 +547,14 @@ public class RebuildIndexTest extends AbstractCoreIT {
);
ei.deleteApplication().toBlocking().lastOrDefault( null );
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
}
private int readData( EntityManager em, String collectionName, int expectedEntities, int expectedConnections )
throws Exception {
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
Query q = Query.fromQL( "select * where key1=1000" ).withLimit( 1000 );
Results results = em.searchCollectionConsistent( em.getApplicationRef(), collectionName, q, expectedEntities );
@@ -593,7 +593,7 @@ public class RebuildIndexTest extends AbstractCoreIT {
private int countEntities( EntityManager em, String collectionName, int expectedEntities)
throws Exception {
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
Query q = Query.fromQL( "select * where key1=1000" ).withLimit( 1000 );
Results results = em.searchCollectionConsistent( em.getApplicationRef(), collectionName, q, expectedEntities );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImplIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImplIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImplIT.java
index d287d7e..3652b6f 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImplIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImplIT.java
@@ -26,11 +26,8 @@ import java.util.UUID;
import org.apache.usergrid.Application;
import org.apache.usergrid.CoreApplication;
import org.apache.usergrid.corepersistence.index.ReIndexRequestBuilder;
-import org.apache.usergrid.corepersistence.index.ReIndexRequestBuilderImpl;
import org.apache.usergrid.corepersistence.index.ReIndexService;
-import org.apache.usergrid.corepersistence.index.ReIndexServiceImpl;
import org.apache.usergrid.persistence.*;
-import org.apache.usergrid.utils.UUIDUtils;
import org.junit.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,8 +40,6 @@ import org.apache.usergrid.persistence.cassandra.util.TraceTagManager;
import org.apache.usergrid.persistence.cassandra.util.TraceTagReporter;
import org.apache.usergrid.persistence.model.util.UUIDGenerator;
import org.apache.usergrid.setup.ConcurrentProcessSingleton;
-import rx.functions.Func0;
-import rx.functions.Func1;
import rx.functions.Func2;
import javax.annotation.concurrent.NotThreadSafe;
@@ -140,7 +135,7 @@ public class EntityManagerFactoryImplIT extends AbstractCoreIT {
Thread.sleep( 500 );
}
- this.app.refreshIndex();
+ this.app.waitForQueueDrainAndRefreshIndex();
// wait for it to appear in delete apps list
@@ -164,7 +159,7 @@ public class EntityManagerFactoryImplIT extends AbstractCoreIT {
// delete the application
setup.getEmf().deleteApplication(deletedAppId);
- this.app.refreshIndex();
+ this.app.waitForQueueDrainAndRefreshIndex();
found = findApps.call( deletedAppId, emf.getDeletedApplications() );
@@ -196,14 +191,14 @@ public class EntityManagerFactoryImplIT extends AbstractCoreIT {
}
}while (status.getStatus()!= ReIndexService.Status.COMPLETE);
- this.app.refreshIndex();
+ this.app.waitForQueueDrainAndRefreshIndex();
// test to see that app now works and is happy
// it should not be found in the deleted apps collection
found = findApps.call( deletedAppId, emf.getDeletedApplications());
assertFalse("Restored app found in deleted apps collection", found);
- this.app.refreshIndex();
+ this.app.waitForQueueDrainAndRefreshIndex();
apps = setup.getEmf().getApplications();
found = findApps.call(deletedAppId, apps);
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/persistence/query/ConnectionHelper.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/query/ConnectionHelper.java b/stack/core/src/test/java/org/apache/usergrid/persistence/query/ConnectionHelper.java
index e5c84f8..1f53c0a 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/query/ConnectionHelper.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/query/ConnectionHelper.java
@@ -76,7 +76,7 @@ public class ConnectionHelper extends CollectionIoHelper {
@Override
public Results getResults( Query query ) throws Exception {
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
query.setConnectionType( CONNECTION );
query.setEntityType( "test" );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/persistence/query/IntersectionTransitivePagingIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/query/IntersectionTransitivePagingIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/query/IntersectionTransitivePagingIT.java
index cea3b35..dcc8eae 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/query/IntersectionTransitivePagingIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/query/IntersectionTransitivePagingIT.java
@@ -131,7 +131,7 @@ public class IntersectionTransitivePagingIT{
}
- this.app.refreshIndex();
+ this.app.waitForQueueDrainAndRefreshIndex();
Thread.sleep(1000);
return expected;
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/persistence/query/IntersectionUnionPagingIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/query/IntersectionUnionPagingIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/query/IntersectionUnionPagingIT.java
index 4d60164..3403dc8 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/query/IntersectionUnionPagingIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/query/IntersectionUnionPagingIT.java
@@ -135,7 +135,7 @@ public class IntersectionUnionPagingIT {
}
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
long stop = System.currentTimeMillis();
logger.info( "Writes took {} ms", stop - start );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/persistence/query/IteratingQueryIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/query/IteratingQueryIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/query/IteratingQueryIT.java
index b2003fe..dac3f68 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/query/IteratingQueryIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/query/IteratingQueryIT.java
@@ -298,7 +298,7 @@ public class IteratingQueryIT {
//we have to sleep, or we kill embedded cassandra
}
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
Thread.sleep(1000);
long stop = System.currentTimeMillis();
@@ -367,7 +367,7 @@ public class IteratingQueryIT {
expected.add( name );
}
}
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
long stop = System.currentTimeMillis();
@@ -438,7 +438,7 @@ public class IteratingQueryIT {
}
}
- this.app.refreshIndex();
+ this.app.waitForQueueDrainAndRefreshIndex();
long stop = System.currentTimeMillis();
logger.info( "Writes took {} ms", stop - start );
@@ -551,7 +551,7 @@ public class IteratingQueryIT {
expectedResults.add( name );
}
}
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
long stop = System.currentTimeMillis();
@@ -623,7 +623,7 @@ public class IteratingQueryIT {
}
}
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
long stop = System.currentTimeMillis();
logger.info( "Writes took {} ms", stop - start );
@@ -689,7 +689,7 @@ public class IteratingQueryIT {
}
}
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
long stop = System.currentTimeMillis();
logger.info( "Writes took {} ms", stop - start );
@@ -742,7 +742,7 @@ public class IteratingQueryIT {
io.writeEntity( entity );
expected.add( name );
}
- this.app.refreshIndex();
+ this.app.waitForQueueDrainAndRefreshIndex();
long stop = System.currentTimeMillis();
@@ -803,7 +803,7 @@ public class IteratingQueryIT {
io.writeEntity( entity );
expected.add( name );
}
- this.app.refreshIndex();
+ this.app.waitForQueueDrainAndRefreshIndex();
long stop = System.currentTimeMillis();
@@ -865,7 +865,7 @@ public class IteratingQueryIT {
expected.add( name );
}
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
long stop = System.currentTimeMillis();
logger.info( "Writes took {} ms", stop - start );
@@ -924,7 +924,7 @@ public class IteratingQueryIT {
io.writeEntity( entity );
expected.add( name );
}
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
Thread.sleep(500);
long stop = System.currentTimeMillis();
@@ -987,7 +987,7 @@ public class IteratingQueryIT {
expected.add( name );
}
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
long stop = System.currentTimeMillis();
logger.info( "Writes took {} ms", stop - start );
@@ -1050,7 +1050,7 @@ public class IteratingQueryIT {
io.writeEntity( entity );
expected.add( name );
}
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
long stop = System.currentTimeMillis();
@@ -1110,7 +1110,7 @@ public class IteratingQueryIT {
io.writeEntity( entity );
}
- this.app.refreshIndex();
+ this.app.waitForQueueDrainAndRefreshIndex();
long stop = System.currentTimeMillis();
@@ -1216,7 +1216,7 @@ public class IteratingQueryIT {
logger.info( "Writes took {} ms", stop - start );
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
Query query = Query.fromQL( "select * order by boolean desc, index asc" );
query.setLimit( queryLimit );
@@ -1322,7 +1322,7 @@ public class IteratingQueryIT {
logger.info( "Writes took {} ms", stop - start );
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
Query query =
Query.fromQL( "select * where intersect = true OR intersect2 = true order by created, intersect desc" );
@@ -1384,7 +1384,7 @@ public class IteratingQueryIT {
io.writeEntity( entity );
}
- this.app.refreshIndex();
+ this.app.waitForQueueDrainAndRefreshIndex();
long stop = System.currentTimeMillis();
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/persistence/query/NotSubPropertyIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/query/NotSubPropertyIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/query/NotSubPropertyIT.java
index f7308da..3f5573f 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/query/NotSubPropertyIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/query/NotSubPropertyIT.java
@@ -132,7 +132,7 @@ public class NotSubPropertyIT {
logger.info( "Writes took {} ms", stop - start );
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
return expected;
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/persistence/query/ParenthesisProblemIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/query/ParenthesisProblemIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/query/ParenthesisProblemIT.java
index 60c1622..89641a8 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/query/ParenthesisProblemIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/query/ParenthesisProblemIT.java
@@ -72,7 +72,7 @@ public class ParenthesisProblemIT extends AbstractCoreIT {
put("age",1);
}});
- app.refreshIndex();
+ app.waitForQueueDrainAndRefreshIndex();
final Results entities = em.searchCollection( em.getApplicationRef(), "cats", Query.fromQL(query));
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/resources/usergrid-custom-test.properties
----------------------------------------------------------------------
diff --git a/stack/core/src/test/resources/usergrid-custom-test.properties b/stack/core/src/test/resources/usergrid-custom-test.properties
index c544967..8f9058d 100644
--- a/stack/core/src/test/resources/usergrid-custom-test.properties
+++ b/stack/core/src/test/resources/usergrid-custom-test.properties
@@ -49,6 +49,9 @@ collection.uniquevalues.authoritative.region=us-east
# Queueing Test Settings
# Reduce the long polling time for the tests
queue.long.polling.time.millis=50
+elasticsearch.worker_count=8
+elasticsearch.worker_count_utility=8
+queue.get.timeout.seconds=8
# --- End: Usergrid cluster/actor system settings
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
index 061807b..778274e 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
@@ -165,7 +165,7 @@ public interface QakkaFig extends GuicyFig, Serializable {
long getMaxShardSize();
@Key(QUEUE_LONG_POLL_TIME_MILLIS)
- @Default("5000")
+ @Default("1000")
long getLongPollTimeMillis();
/** Max time-to-live for queue message and payload data */
@@ -174,7 +174,7 @@ public interface QakkaFig extends GuicyFig, Serializable {
int getMaxTtlSeconds();
@Key(QUEUE_IN_MEMORY)
- @Default("true")
+ @Default("false") // in memory not ready yet; leave this to false else msgs could be processed more than once
boolean getInMemoryCache();
@Key(QUEUE_IN_MEMORY_REFRESH_ASYNC)
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java
index 09bb8de..fa5ee0b 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java
@@ -59,7 +59,7 @@ public class InMemoryQueue {
}
}
- public void add( String queueName, DatabaseQueueMessage databaseQueueMessage ) {
+ synchronized public void add( String queueName, DatabaseQueueMessage databaseQueueMessage ) {
UUID newest = newestByQueueName.get( queueName );
if ( newest == null ) {
@@ -76,7 +76,7 @@ public class InMemoryQueue {
getQueue( queueName ).add( databaseQueueMessage );
}
- public UUID getNewest( String queueName ) {
+ synchronized public UUID getNewest( String queueName ) {
if ( getQueue( queueName ).isEmpty() ) {
newestByQueueName.remove( queueName );
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java
index ac2857f..fd4257b 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java
@@ -44,6 +44,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.UUID;
+import java.util.concurrent.TimeUnit;
@Singleton
@@ -188,7 +189,7 @@ public class QueueMessageManagerImpl implements QueueMessageManager {
queueMessage.setData( json );
} catch (UnsupportedEncodingException e) {
- logger.error("Error unencoding data for messageId=" + queueMessage.getMessageId(), e);
+ logger.error("Error decoding data for messageId=" + queueMessage.getMessageId(), e);
}
} else {
try {
@@ -201,6 +202,12 @@ public class QueueMessageManagerImpl implements QueueMessageManager {
}
queueMessages.add( queueMessage );
+ } else if ( (System.currentTimeMillis() - dbMessage.getQueuedAt()) > TimeUnit.HOURS.toMillis(2) ) {
+ logger.warn("Queue Message does not have corresponding data after 2 hours, removing from queue - " +
+ "queueName: {}, region: {}, queueMessageId: {}", dbMessage.getQueueName(), dbMessage.getRegion(),
+ dbMessage.getQueueMessageId());
+ queueMessageSerialization.deleteMessage(dbMessage.getQueueName(), dbMessage.getRegion(),
+ dbMessage.getShardId(), dbMessage.getType(), dbMessage.getQueueMessageId());
}
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java
index 89c79ec..eb26b69 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java
@@ -23,6 +23,7 @@ import com.codahale.metrics.Timer;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.commons.lang3.SystemUtils;
import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
import org.apache.usergrid.persistence.model.util.UUIDGenerator;
import org.apache.usergrid.persistence.qakka.MetricsService;
@@ -177,7 +178,7 @@ public class QueueActorHelper {
}
}
- newestFetchedUuid.put( queueName, since );
+ updateUUIDPointer(queueName, since);
// Shard currentShard = multiShardIterator.getCurrentShard();
// if ( currentShard != null ) {
@@ -279,7 +280,7 @@ public class QueueActorHelper {
}
- void queueRefresh( String queueName ) {
+ synchronized void queueRefresh( String queueName ) {
Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.REFRESH_TIME).time();
@@ -327,7 +328,7 @@ public class QueueActorHelper {
startingShards.put( shardKey, shardId );
- lastRefreshTimeMillis.put( queueName, System.currentTimeMillis() );
+ updateLastRefreshedTime(queueName);
if ( count > 0 ) {
Object shard = shardIdOptional.isPresent() ? shardIdOptional.get() : "null";
@@ -346,4 +347,12 @@ public class QueueActorHelper {
return queueName + "_" + type + region;
}
+ private synchronized void updateUUIDPointer(String queueName, UUID newUUIDPointer){
+ newestFetchedUuid.put( queueName, newUUIDPointer );
+ }
+
+ private synchronized void updateLastRefreshedTime(String queueName){
+ lastRefreshTimeMillis.put( queueName, System.currentTimeMillis() );
+ }
+
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java
index 1ff8502..cbc7245 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java
@@ -133,7 +133,7 @@ public class QueueActorRouter extends UntypedActor {
getContext().dispatcher(),
getSelf() );
shardAllocationSchedulersByQueueName.put( queueName, scheduler );
- logger.debug( "Created shard allocater for queue {}", queueName );
+ logger.debug( "Created shard allocator for queue {}", queueName );
}
}
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java
index 19059e6..75c1c22 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java
@@ -139,8 +139,8 @@ public class ShardAllocator extends UntypedActor {
shardSerialization.createShard( newShard );
shardCounterSerialization.incrementCounter( queueName, type, newShard.getShardId(), 0 );
- logger.info("{} Created new shard for queue {} shardId {} timestamp {} counterValue {}",
- this.hashCode(), queueName, shard.getShardId(), futureUUID.timestamp(), counterValue );
+ logger.info("Allocated new shard for queue, newShardID: {}, queueName: {}, shardMessageCount: {}, usedPercent: {}%",
+ newShard.getShardId(), queueName, counterValue, (long)((double)counterValue/(double)qakkaFig.getMaxShardSize()*100) );
} else {
// logger.debug("No new shard for queue {} counterValue {} of max {}",
[2/3] usergrid git commit: Switch DISTRIBUTED database queueing to
default not cache in memory as the in memory impl causes duplicate messgae
processing quite often at the moment.
Posted by mr...@apache.org.
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
index 98e055a..b1f72aa 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
@@ -20,6 +20,8 @@
package org.apache.usergrid.persistence.qakka.distributed.impl;
import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.dispatch.OnFailure;
import akka.pattern.Patterns;
import akka.util.Timeout;
import com.codahale.metrics.*;
@@ -42,7 +44,9 @@ import org.apache.usergrid.persistence.qakka.serialization.queuemessages.Databas
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Await;
+import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
+import scala.concurrent.Promise;
import java.lang.reflect.Method;
import java.util.*;
@@ -235,20 +239,21 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
public Collection<DatabaseQueueMessage> getNextMessagesInternal( String queueName, int count ) {
if ( actorSystemManager.getClientActor() == null || !actorSystemManager.isReady() ) {
- logger.error("Akka Actor System is not ready yet for requests.");
- return Collections.EMPTY_LIST;
+ logger.warn("Akka Actor System is not ready yet for requests.");
+ return Collections.emptyList();
}
int maxRetries = qakkaFig.getMaxGetRetries();
int tries = 0;
+ boolean interrupted = false;
+
QueueGetRequest request = new QueueGetRequest( queueName, count );
while ( ++tries < maxRetries ) {
try {
Timeout t = new Timeout( qakkaFig.getGetTimeoutSeconds(), TimeUnit.SECONDS );
// ask ClientActor and wait (up to timeout) for response
-
Future<Object> fut = Patterns.ask( actorSystemManager.getClientActor(), request, t );
Object responseObject = Await.result( fut, t.duration() );
@@ -259,8 +264,8 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
if ( response != null && response instanceof QueueGetResponse) {
QueueGetResponse qprm = (QueueGetResponse)response;
if ( qprm.isSuccess() ) {
- if (tries > 1) {
- logger.warn( "getNextMessage {} SUCCESS after {} tries", queueName, tries );
+ if (tries > 1 && !interrupted) {
+ logger.warn( "getNextMessage for queue {} SUCCESS after {} tries", queueName, tries );
}
}
return qprm.getQueueMessages();
@@ -284,10 +289,13 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
}
} catch ( TimeoutException e ) {
- logger.trace("TIMEOUT popping to queue " + queueName + " retrying " + tries, e );
-
- } catch ( Exception e ) {
- logger.debug("ERROR popping to queue " + queueName + " retrying " + tries, e );
+ logger.warn("TIMEOUT popping queue " + queueName + ", attempt: " + tries, e );
+ } catch(InterruptedException e){
+ interrupted = true;
+ // this might happen, retry the ask again
+ logger.trace("Thread was marked interrupted so unable to wait for the result, attempt: {}", tries);
+ }catch ( Exception e ) {
+ logger.error("ERROR popping queue " + queueName + ", attempt: " + tries, e );
}
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java
index 11f3d08..4745cb1 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java
@@ -181,6 +181,9 @@ public class ShardAllocatorTest extends AbstractAkkaTest {
distributedQueueService.refresh();
+ // the shard allocator kicks in when messages are first received
+ distributedQueueService.getNextMessages(queueName,10);
+
try {
// Create number of messages
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/corepersistence/queue/src/test/resources/qakka.properties
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/resources/qakka.properties b/stack/corepersistence/queue/src/test/resources/qakka.properties
index 94bfeff..d77e7e8 100644
--- a/stack/corepersistence/queue/src/test/resources/qakka.properties
+++ b/stack/corepersistence/queue/src/test/resources/qakka.properties
@@ -36,7 +36,8 @@ usergrid.cluster.seeds=us-east:localhost
# Port used for cluster communications.
usergrid.cluster.port=3545
-queue.inmemory.cache=true
+# In-Memory Queueing Not Ready Yet; Leave this to false else, messages are potentially processed more than once
+queue.inmemory.cache=false
queue.num.actors=50
queue.sender.num.actors=100
@@ -47,7 +48,7 @@ queue.get.timeout.seconds=5
# set shard size and times low for testing purposes
queue.shard.max.size=10
-queue.shard.allocation.check.frequency.millis=1000
+queue.shard.allocation.check.frequency.millis=500
queue.shard.allocation.advance.time.millis=200
# set low for testing purposes
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/CollectionMetadataIT.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/CollectionMetadataIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/CollectionMetadataIT.java
index 4e58935..a96d725 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/CollectionMetadataIT.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/CollectionMetadataIT.java
@@ -63,7 +63,7 @@ public class CollectionMetadataIT extends AbstractRestIT {
e3 = this.app().collection(collectionName).post(e3);
assertNotNull(e3);
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
// create connections
// e1 hates e3
@@ -73,7 +73,7 @@ public class CollectionMetadataIT extends AbstractRestIT {
// e3 has one in (hates) connection
this.app().collection(collectionName).entity(e1).connection("hates").entity(e3).post();
this.app().collection(collectionName).entity(e2).connection("likes").entity(e1).post();
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
// no query param, "all", and invalid param all the same
checkMetadata(e1, null, "hates", "likes");
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/NotificationsIT.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/NotificationsIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/NotificationsIT.java
index fa68350..922c678 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/NotificationsIT.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/NotificationsIT.java
@@ -18,17 +18,15 @@ package org.apache.usergrid.rest;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Slf4jReporter;
-import com.fasterxml.jackson.databind.JsonNode;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
-import javax.ws.rs.core.MediaType;
+
import org.apache.commons.lang3.time.StopWatch;
-import org.apache.usergrid.rest.test.resource.*;
-import org.apache.usergrid.rest.test.resource.endpoints.NamedResource;
import org.apache.usergrid.rest.test.resource.model.*;
import org.apache.usergrid.rest.test.resource.model.ApiResponse;
import org.junit.After;
@@ -85,7 +83,7 @@ public class NotificationsIT extends org.apache.usergrid.rest.test.resource.Abst
String unIndexedCollectionName = "notifications";
app().collection( unIndexedCollectionName ).collection( "_settings" ).post( payload );
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
// create notifier
Entity notifier = new Entity().chainPut("name", "mynotifier").chainPut("provider", "noop");
@@ -103,7 +101,7 @@ public class NotificationsIT extends org.apache.usergrid.rest.test.resource.Abst
Token token = this.app().token().post(new Token("ed", "sesame"));
this.clientSetup.getRestClient().token().setToken(token);
- this.refreshIndex();
+ this.waitForQueueDrainAndRefreshIndex();
// create devices
int devicesCount = 0;
@@ -129,7 +127,7 @@ public class NotificationsIT extends org.apache.usergrid.rest.test.resource.Abst
devicesCount++;
}
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
String postMeterName = getClass().getSimpleName() + ".postNotifications";
Meter postMeter = registry.meter( postMeterName );
@@ -168,7 +166,7 @@ public class NotificationsIT extends org.apache.usergrid.rest.test.resource.Abst
}
registry.remove( postMeterName );
- refreshIndex( );
+ waitForQueueDrainAndRefreshIndex( );
logger.info("Waiting for all notifications to be sent");
StopWatch sw = new StopWatch();
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/PartialUpdateTest.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/PartialUpdateTest.java b/stack/rest/src/test/java/org/apache/usergrid/rest/PartialUpdateTest.java
index 1067365..9b295f0 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/PartialUpdateTest.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/PartialUpdateTest.java
@@ -61,7 +61,7 @@ public class PartialUpdateTest extends AbstractRestIT {
String uuid = userNode.get("uuid").toString();
assertNotNull(uuid);
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
Map<String, Object> updateProperties = new LinkedHashMap<String, Object>();
// update user bart passing only an update to a property
@@ -81,7 +81,7 @@ public class PartialUpdateTest extends AbstractRestIT {
fail("Update failed due to: " + uie.getResponse().readEntity(String.class));
}
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
// retrieve the user from the backend
userNode = this.app().collection("users").entity(userNode).get();
@@ -123,7 +123,7 @@ public class PartialUpdateTest extends AbstractRestIT {
} catch (ClientErrorException uie) {
fail("Update failed due to: " + uie.getResponse().readEntity(String.class));
}
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
userNode = this.app().collection("users").entity(userNode).get();
assertNotNull(userNode);
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/SystemResourceIT.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/SystemResourceIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/SystemResourceIT.java
index dd2a733..383c046 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/SystemResourceIT.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/SystemResourceIT.java
@@ -24,7 +24,6 @@ import org.junit.Test;
import java.util.LinkedHashMap;
import java.util.Map;
-import java.util.UUID;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@@ -57,7 +56,7 @@ public class SystemResourceIT extends AbstractRestIT {
for(int i =0; i<count;i++) {
this.app().collection("tests").post(new Entity().chainPut("testval", "test"));
}
- this.refreshIndex();
+ this.waitForQueueDrainAndRefreshIndex();
QueryParameters queryParameters = new QueryParameters();
queryParameters.addParam( "access_token", clientSetup.getSuperuserToken().getAccessToken() );
queryParameters.addParam("confirmApplicationName", this.clientSetup.getAppName());
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/applications/ApplicationCreateIT.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/ApplicationCreateIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/ApplicationCreateIT.java
index 0f9be30..a6d987b 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/ApplicationCreateIT.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/ApplicationCreateIT.java
@@ -104,7 +104,7 @@ public class ApplicationCreateIT extends AbstractRestIT {
.management().orgs().org( orgName ).app().post( new Application( appName ) );
UUID appId = appCreateResponse.getEntities().get(0).getUuid();
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
for ( int i=0; i<5; i++ ) {
final String entityName = "entity" + i;
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/applications/ApplicationDeleteIT.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/ApplicationDeleteIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/ApplicationDeleteIT.java
index 6416cff..6521444 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/ApplicationDeleteIT.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/ApplicationDeleteIT.java
@@ -213,7 +213,7 @@ public class ApplicationDeleteIT extends AbstractRestIT {
// test that we cannot see the application in the list of applications returned
// by the management resource's get organization's applications end-point
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
ManagementResponse orgAppResponse = clientSetup.getRestClient()
.management().orgs().org( orgName ).apps().getOrganizationApplications();
@@ -295,7 +295,7 @@ public class ApplicationDeleteIT extends AbstractRestIT {
.request()
.delete();
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
// restore the app
@@ -308,7 +308,7 @@ public class ApplicationDeleteIT extends AbstractRestIT {
.request()
.put( javax.ws.rs.client.Entity.entity( "", MediaType.APPLICATION_JSON )); // must send body
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
// test that we can see the application in the list of applications
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/applications/ApplicationResourceIT.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/ApplicationResourceIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/ApplicationResourceIT.java
index 9f4f8aa..8dabf93 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/ApplicationResourceIT.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/ApplicationResourceIT.java
@@ -181,7 +181,7 @@ public class ApplicationResourceIT extends AbstractRestIT {
}
logger.info( "Waiting for app to become available" );
Thread.sleep(500);
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
}
assertNotNull( clientId );
assertNotNull( clientSecret );
@@ -242,7 +242,7 @@ public class ApplicationResourceIT extends AbstractRestIT {
assertNotNull(entity);
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
//retrieve the app using a username and password
QueryParameters params = new QueryParameters()
@@ -354,7 +354,7 @@ public class ApplicationResourceIT extends AbstractRestIT {
Entity entity = this.app().collection("users").post(user);
//assert that it was saved correctly
assertNotNull(entity);
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
//add a ttl to the entity that is greater than the maximum
entity.chainPut("grant_type", "password").chainPut("ttl", Long.MAX_VALUE);
@@ -392,7 +392,7 @@ public class ApplicationResourceIT extends AbstractRestIT {
//save the entity
Entity entity = this.app().collection("users").post(user);
assertNotNull(entity);
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
//Retrieve an authentication token for the user, setting the TTL
Token apiResponse = target().path( String.format( "/%s/%s/token", orgName, appName ) )
@@ -457,7 +457,7 @@ public class ApplicationResourceIT extends AbstractRestIT {
//save the entity
Entity entity = this.app().collection("users").post(user);
assertNotNull(entity);
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
try {
//Retrieve a token for the new user, setting the TTL to an invalid value
@@ -496,7 +496,7 @@ public class ApplicationResourceIT extends AbstractRestIT {
//save the entity
Entity entity = this.app().collection("users").post(user);
assertNotNull(entity);
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
//Retrieve an authentication token for the user
Token tokenResponse = this.app().getTarget( false ).path( "token" )
.queryParam( "grant_type", "password" )
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/applications/assets/AssetResourceIT.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/assets/AssetResourceIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/assets/AssetResourceIT.java
index 144893d..616d929 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/assets/AssetResourceIT.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/assets/AssetResourceIT.java
@@ -72,7 +72,7 @@ public class AssetResourceIT extends AbstractRestIT {
@Test
public void octetStreamOnDynamicEntity() throws Exception {
- this.refreshIndex();
+ this.waitForQueueDrainAndRefreshIndex();
// post an asset entity
@@ -113,7 +113,7 @@ public class AssetResourceIT extends AbstractRestIT {
@Test
public void verifyMetadataChanged() throws Exception {
- this.refreshIndex();
+ this.waitForQueueDrainAndRefreshIndex();
// post an entity
@@ -130,7 +130,7 @@ public class AssetResourceIT extends AbstractRestIT {
.field( "name", "verifyMetadataChangedTest" )
.field( "file", data, MediaType.MULTIPART_FORM_DATA_TYPE );
ApiResponse putResponse = pathResource( getOrgAppPath( "foos/" + assetId ) ).put( form );
- this.refreshIndex();
+ this.waitForQueueDrainAndRefreshIndex();
// get entity and check asset metadata
@@ -175,7 +175,7 @@ public class AssetResourceIT extends AbstractRestIT {
.field( "name", "verifyMetadataChangedTest" )
.field( "file", data, MediaType.MULTIPART_FORM_DATA_TYPE );
putResponse = pathResource( getOrgAppPath( "foos/" + assetId ) ).put( form );
- this.refreshIndex();
+ this.waitForQueueDrainAndRefreshIndex();
//verify that data was correctly written to backend
getResponse = pathResource( getOrgAppPath( "foos/" + assetId ) ).get( ApiResponse.class );
@@ -193,14 +193,14 @@ public class AssetResourceIT extends AbstractRestIT {
@Test
public void multipartPostFormOnDynamicEntity() throws Exception {
- this.refreshIndex();
+ this.waitForQueueDrainAndRefreshIndex();
// post data larger than 5M
byte[] data = IOUtils.toByteArray( this.getClass().getResourceAsStream( "/file-bigger-than-5M" ) );
FormDataMultiPart form = new FormDataMultiPart().field( "file", data, MediaType.MULTIPART_FORM_DATA_TYPE );
ApiResponse putResponse = pathResource(getOrgAppPath("foos")).post(form);
- this.refreshIndex();
+ this.waitForQueueDrainAndRefreshIndex();
UUID assetId = putResponse.getEntities().get(0).getUuid();
assertNotNull(assetId);
@@ -234,7 +234,7 @@ public class AssetResourceIT extends AbstractRestIT {
@Test
public void multipartPutFormOnDynamicEntity() throws Exception {
- this.refreshIndex();
+ this.waitForQueueDrainAndRefreshIndex();
// post an entity
@@ -250,7 +250,7 @@ public class AssetResourceIT extends AbstractRestIT {
.field( "foo", "bar2" )
.field( "file", data, MediaType.MULTIPART_FORM_DATA_TYPE );
ApiResponse putResponse = pathResource( getOrgAppPath( "foos/" + assetId ) ).put( form );
- this.refreshIndex();
+ this.waitForQueueDrainAndRefreshIndex();
// get entity and check asset metadata
@@ -283,7 +283,7 @@ public class AssetResourceIT extends AbstractRestIT {
@Test
public void largeFileInS3() throws Exception {
- this.refreshIndex();
+ this.waitForQueueDrainAndRefreshIndex();
// upload file larger than 5MB
@@ -310,7 +310,7 @@ public class AssetResourceIT extends AbstractRestIT {
@Test
public void fileTooLargeShouldResultInError() throws Exception {
- this.refreshIndex();
+ this.waitForQueueDrainAndRefreshIndex();
// set max file size down to 6mb
@@ -354,7 +354,7 @@ public class AssetResourceIT extends AbstractRestIT {
@Test
public void deleteConnectionToAsset() throws IOException {
- this.refreshIndex();
+ this.waitForQueueDrainAndRefreshIndex();
// create the entity that will be the asset, an image
@@ -378,7 +378,7 @@ public class AssetResourceIT extends AbstractRestIT {
ApiResponse connectResponse = pathResource(
getOrgAppPath( "imagegalleries/" + imageGalleryId + "/contains/" + uuid ) ).post( ApiResponse.class );
- this.refreshIndex();
+ this.waitForQueueDrainAndRefreshIndex();
// verify connection from imagegallery to asset
@@ -389,7 +389,7 @@ public class AssetResourceIT extends AbstractRestIT {
// delete the connection
pathResource( getOrgAppPath( "imagegalleries/" + imageGalleryId + "/contains/" + uuid ) ).delete();
- this.refreshIndex();
+ this.waitForQueueDrainAndRefreshIndex();
// verify that connection is gone
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/applications/assets/AwsAssetResourceIT.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/assets/AwsAssetResourceIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/assets/AwsAssetResourceIT.java
index ad12975..4a9bfaa 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/assets/AwsAssetResourceIT.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/assets/AwsAssetResourceIT.java
@@ -205,7 +205,7 @@ public class AwsAssetResourceIT extends AbstractRestIT {
@Test
public void octetStreamOnDynamicEntity() throws Exception {
- this.refreshIndex();
+ this.waitForQueueDrainAndRefreshIndex();
// post an asset entity
@@ -246,14 +246,14 @@ public class AwsAssetResourceIT extends AbstractRestIT {
@Test
public void multipartPostFormOnDynamicEntity() throws Exception {
- this.refreshIndex();
+ this.waitForQueueDrainAndRefreshIndex();
// post data larger than 5M
byte[] data = IOUtils.toByteArray( this.getClass().getResourceAsStream( "/file-bigger-than-5M" ) );
FormDataMultiPart form = new FormDataMultiPart().field( "file", data, MediaType.MULTIPART_FORM_DATA_TYPE );
ApiResponse putResponse = pathResource(getOrgAppPath("foos")).post(form);
- this.refreshIndex();
+ this.waitForQueueDrainAndRefreshIndex();
UUID assetId = putResponse.getEntities().get(0).getUuid();
assertNotNull(assetId);
@@ -287,7 +287,7 @@ public class AwsAssetResourceIT extends AbstractRestIT {
@Test
public void multipartPutFormOnDynamicEntity() throws Exception {
- this.refreshIndex();
+ this.waitForQueueDrainAndRefreshIndex();
// post an entity
@@ -303,7 +303,7 @@ public class AwsAssetResourceIT extends AbstractRestIT {
.field( "foo", "bar2" )
.field( "file", data, MediaType.MULTIPART_FORM_DATA_TYPE );
ApiResponse putResponse = pathResource( getOrgAppPath( "foos/" + assetId ) ).put( form );
- this.refreshIndex();
+ this.waitForQueueDrainAndRefreshIndex();
// get entity and check asset metadata
@@ -336,7 +336,7 @@ public class AwsAssetResourceIT extends AbstractRestIT {
@Test
public void largeFileInS3() throws Exception {
- this.refreshIndex();
+ this.waitForQueueDrainAndRefreshIndex();
// upload file larger than 5MB
@@ -363,7 +363,7 @@ public class AwsAssetResourceIT extends AbstractRestIT {
@Test
public void fileTooLargeShouldResultInError() throws Exception {
- this.refreshIndex();
+ this.waitForQueueDrainAndRefreshIndex();
// set max file size down to 6mb
setTestProperty( "usergrid.binary.max-size-mb","6" );
@@ -383,7 +383,7 @@ public class AwsAssetResourceIT extends AbstractRestIT {
// attempt to get asset entity, it should contain error
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
ApiResponse getResponse = pathResource( getOrgAppPath( "bars/" +assetId ) ).get( ApiResponse.class );
Map<String, Object> fileMetadata = (Map<String, Object>)getResponse.getEntities().get(0).get("file-metadata");
assertNotNull( fileMetadata );
@@ -403,7 +403,7 @@ public class AwsAssetResourceIT extends AbstractRestIT {
@Test
public void deleteConnectionToAsset() throws IOException {
- this.refreshIndex();
+ this.waitForQueueDrainAndRefreshIndex();
// create the entity that will be the asset, an image
@@ -427,7 +427,7 @@ public class AwsAssetResourceIT extends AbstractRestIT {
ApiResponse connectResponse = pathResource(
getOrgAppPath( "imagegalleries/" + imageGalleryId + "/contains/" + uuid ) ).post( ApiResponse.class );
- this.refreshIndex();
+ this.waitForQueueDrainAndRefreshIndex();
// verify connection from imagegallery to asset
@@ -438,7 +438,7 @@ public class AwsAssetResourceIT extends AbstractRestIT {
// delete the connection
pathResource( getOrgAppPath( "imagegalleries/" + imageGalleryId + "/contains/" + uuid ) ).delete();
- this.refreshIndex();
+ this.waitForQueueDrainAndRefreshIndex();
// verify that connection is gone
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/BrowserCompatibilityTest.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/BrowserCompatibilityTest.java b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/BrowserCompatibilityTest.java
index b453ed2..b63400a 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/BrowserCompatibilityTest.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/BrowserCompatibilityTest.java
@@ -69,7 +69,7 @@ public class BrowserCompatibilityTest extends org.apache.usergrid.rest.test.reso
Entity entity = this.app().collection("things").post(payload);
assertEquals(entity.get("name"), name);
String uuid = entity.getAsString("uuid");
- this.refreshIndex();
+ this.waitForQueueDrainAndRefreshIndex();
//now get this new entity with "text/html" in the accept header
Entity returnedEntity = this.app().collection("things").withAcceptHeader(acceptHeader).entity(entity).get();
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/CollectionsResourceIT.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/CollectionsResourceIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/CollectionsResourceIT.java
index d72054a..bf06c21 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/CollectionsResourceIT.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/CollectionsResourceIT.java
@@ -136,7 +136,7 @@ public class CollectionsResourceIT extends AbstractRestIT {
fail("This should return a success.");
}
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
Collection collection = this.app().collection( "testCollections" ).collection( "_settings" ).get();
@@ -159,7 +159,7 @@ public class CollectionsResourceIT extends AbstractRestIT {
//Post index to the collection metadata
Entity thing = this.app().collection( "testCollections" ).collection( "_settings" ).post( payload );
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
//The above verifies the test case.
@@ -172,7 +172,7 @@ public class CollectionsResourceIT extends AbstractRestIT {
//Post entity.
Entity postedEntity = this.app().collection( "testCollections" ).post( testEntity );
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
//Do a query to see if you can find the indexed query.
String query = "two ='query'";
@@ -198,11 +198,11 @@ public class CollectionsResourceIT extends AbstractRestIT {
//next part is to delete the schema then reindex it and it should work.
this.app().collection( "testCollections" ).collection( "_settings" ).delete();
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
this.app().collection( "testCollections" ).collection( "_reindex" )
.post(true,clientSetup.getSuperuserToken(),ApiResponse.class,null,null,false);
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
//Do a query to see if you can find the indexed query.
@@ -233,14 +233,14 @@ public class CollectionsResourceIT extends AbstractRestIT {
Entity payload = new Entity();
payload.put( "fields", "all");
app().collection( "testCollection" ).collection( "_settings" ).post( payload );
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
// post entity with two fields
Entity testEntity = new Entity();
testEntity.put( "one", "helper" );
testEntity.put( "two","query" );
app().collection( "testCollection" ).post( testEntity );
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
// verify it can be queried on both fields
@@ -288,7 +288,7 @@ public class CollectionsResourceIT extends AbstractRestIT {
//Post index to the collection metadata
Entity thing = this.app().collection( "testCollection" ).collection( "_settings" ).post( payload );
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
//Reindex and verify that the entity only has field one index.
@@ -339,7 +339,7 @@ public class CollectionsResourceIT extends AbstractRestIT {
//Post index to the collection metadata
Entity thing = this.app().collection( "testCollection" ).collection( "_settings" ).post( payload );
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
Collection collection = this.app().collection( "testCollection" ).collection( "_settings" ).get();
@@ -419,7 +419,7 @@ public class CollectionsResourceIT extends AbstractRestIT {
//Post index to the collection metadata
this.app().collection( "testCollection" ).collection( "_settings" ).post( payload );
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
//Create test collection with a test entity that is partially indexed.
Entity testEntity = new Entity();
@@ -428,7 +428,7 @@ public class CollectionsResourceIT extends AbstractRestIT {
//Post entity.
this.app().collection( "testCollection" ).post( testEntity );
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
//Do a query to see if you can find the indexed query.
String query = "two ='query'";
@@ -461,7 +461,7 @@ public class CollectionsResourceIT extends AbstractRestIT {
//Post index to the collection metadata
this.app().collection( "testCollection" ).collection( "_settings" ).post( payload );
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
Map<String,Object> arrayFieldsForTesting = new HashMap<>();
@@ -475,7 +475,7 @@ public class CollectionsResourceIT extends AbstractRestIT {
//Post entity.
this.app().collection( "testCollection" ).post( testEntity );
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
//Do a query to see if you can find the indexed query.
String query = "one.key = 'value'";
@@ -511,7 +511,7 @@ public class CollectionsResourceIT extends AbstractRestIT {
//Post index to the collection metadata
this.app().collection( "testCollection" ).collection( "_settings" ).post( payload );
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
Map<String,Object> arrayFieldsForTesting = new HashMap<>();
@@ -525,7 +525,7 @@ public class CollectionsResourceIT extends AbstractRestIT {
//Post entity.
this.app().collection( "testCollection" ).post( testEntity );
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
//Do a query to see if you can find the indexed query.
String query = "one.key = 'value'";
@@ -554,7 +554,7 @@ public class CollectionsResourceIT extends AbstractRestIT {
//Post index to the collection metadata
this.app().collection( "testCollection" ).collection( "_settings" ).post( payload );
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
Map<String,Object> arrayFieldsForTestingSelectiveIndexing = new HashMap<>();
@@ -573,7 +573,7 @@ public class CollectionsResourceIT extends AbstractRestIT {
//Post entity.
this.app().collection( "testCollection" ).post( testEntity );
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
//Do a query to see if you can find the indexed query.
String query = "one.key.wowMoreKeys = 'value'";
@@ -609,7 +609,7 @@ public class CollectionsResourceIT extends AbstractRestIT {
//Post index to the collection metadata
this.app().collection( "testCollection" ).collection( "_settings" ).post( payload );
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
Map<String,Object> arrayFieldsForTestingSelectiveIndexing = new HashMap<>();
@@ -629,7 +629,7 @@ public class CollectionsResourceIT extends AbstractRestIT {
//Post entity.
this.app().collection( "testCollection" ).post( testEntity );
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
//Do a query to see if you can find the indexed query.
String query = "name = 'howdy'";
@@ -660,7 +660,7 @@ public class CollectionsResourceIT extends AbstractRestIT {
//Post index to the collection metadata
this.app().collection( "testCollection" ).collection( "_settings" ).post( payload );
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
//Create test collection with a test entity that is partially indexed.
Entity testEntity = new Entity();
@@ -669,11 +669,11 @@ public class CollectionsResourceIT extends AbstractRestIT {
//Post entity.
Entity postedEntity = this.app().collection( "testCollection" ).post( testEntity );
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
testEntity.put( "one","three" );
this.app().collection( "testCollection" ).entity( postedEntity.getUuid() ).put( testEntity );
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
//Do a query to see if you can find the indexed query.
String query = "one = 'three'";
@@ -715,7 +715,7 @@ public class CollectionsResourceIT extends AbstractRestIT {
Entity user = this.app().collection("users").post(payload);
assertEquals(user.get("username"), username);
assertEquals(user.get("email"), email);
- this.refreshIndex();
+ this.waitForQueueDrainAndRefreshIndex();
String collectionName = "nestprofiles";
//create a permission with the path "me" in it
@@ -743,7 +743,7 @@ public class CollectionsResourceIT extends AbstractRestIT {
Entity nestProfile = this.app().collection(collectionName).post(payload);
assertEquals(nestProfile.get("name"), profileName);
- this.refreshIndex();
+ this.waitForQueueDrainAndRefreshIndex();
Entity nestprofileReturned = this.app().collection(collectionName).entity(nestProfile).get();
assertEquals(nestprofileReturned.get("name"), profileName);
@@ -766,7 +766,7 @@ public class CollectionsResourceIT extends AbstractRestIT {
assertEquals( calendarlistOne.get( "summaryOverview" ), summaryOverview );
assertEquals(calendarlistOne.get("caltype"), calType);
- this.refreshIndex();
+ this.waitForQueueDrainAndRefreshIndex();
//post a second entity
payload = new Entity();
@@ -819,9 +819,9 @@ public class CollectionsResourceIT extends AbstractRestIT {
assertNotSame( null,
((LinkedHashMap)(collectionHashMap.get( "collections" ))).get( collectionName.toLowerCase() ));
- this.refreshIndex();
+ this.waitForQueueDrainAndRefreshIndex();
this.app().collection( collectionName ).entity( testEntity.getEntity().getUuid() ).delete();
- this.refreshIndex();
+ this.waitForQueueDrainAndRefreshIndex();
//Verify that the collection still exists despite deleting its only entity.)
@@ -850,7 +850,7 @@ public class CollectionsResourceIT extends AbstractRestIT {
payload.put("name", name);
Entity user = this.app().collection("app_users").post(payload);
assertEquals(user.get("name"), name);
- this.refreshIndex();
+ this.waitForQueueDrainAndRefreshIndex();
Entity user2 = this.app().collection("app_users").entity(user).get();
@@ -880,7 +880,7 @@ public class CollectionsResourceIT extends AbstractRestIT {
String randomizer = RandomStringUtils.randomAlphanumeric(10);
String collectionName = "col_" + randomizer;
app().collection( collectionName ).collection( "_settings" ).post( payload );
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
// was the no-index wildcard saved and others ignored?
Collection collection = app().collection( collectionName ).collection( "_settings" ).get();
@@ -923,7 +923,7 @@ public class CollectionsResourceIT extends AbstractRestIT {
String randomizer = RandomStringUtils.randomAlphanumeric(10);
String unIndexedCollectionName = "col_" + randomizer;
app().collection( unIndexedCollectionName ).collection( "_settings" ).post( payload );
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
String entityName1 = "unindexed1";
Entity unindexed1 = this.app().collection( unIndexedCollectionName )
@@ -982,7 +982,7 @@ public class CollectionsResourceIT extends AbstractRestIT {
String unIndexedCollectionName = "col_" + randomizer;
app().collection( unIndexedCollectionName ).collection( "_settings" ).post( payload );
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
String entityName1 = "unindexed1";
Entity unindexed1 = this.app().collection( unIndexedCollectionName )
@@ -1018,7 +1018,7 @@ public class CollectionsResourceIT extends AbstractRestIT {
app().collection( collectionName ).collection( "_settings" )
.post( new Entity().chainPut( "fields", "all" ) );
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
// get collection settings, should see no region
@@ -1051,7 +1051,7 @@ public class CollectionsResourceIT extends AbstractRestIT {
app().collection( collectionName ).collection( "_settings" )
.post( new Entity().chainPut( REGION_SETTING, "" ) );
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
// get collection settings, should see no region
@@ -1091,14 +1091,14 @@ public class CollectionsResourceIT extends AbstractRestIT {
this.app().collection("notifications/"+ UUIDUtils.newTimeUUID()).post(payload );
- this.refreshIndex();
+ this.waitForQueueDrainAndRefreshIndex();
Collection user2 = this.app().collection("notifications").get();
assertEquals(1,user2.getNumOfEntities());
this.app().collection("notifications/"+ UUIDUtils.newTimeUUID()).put(null,payload );
- this.refreshIndex();
+ this.waitForQueueDrainAndRefreshIndex();
user2 = this.app().collection("notifications").get();
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/DuplicateNameIT.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/DuplicateNameIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/DuplicateNameIT.java
index 0776705..7e1c5a5 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/DuplicateNameIT.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/DuplicateNameIT.java
@@ -41,7 +41,7 @@ public class DuplicateNameIT extends AbstractRestIT {
entity.put("name", "enzo");
//Create an entity named "enzo" in the "things" collection
entity = this.app().collection(collectionName).post(entity);
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
try {
// Try to create a second entity in "things" with the name "enzo".
this.app().collection(collectionName).post(entity);
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/activities/ActivityResourceIT.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/activities/ActivityResourceIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/activities/ActivityResourceIT.java
index c7f39b2..a9f5fee 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/activities/ActivityResourceIT.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/activities/ActivityResourceIT.java
@@ -59,7 +59,7 @@ public class ActivityResourceIT extends AbstractRestIT {
this.activityDesc = "testActivity" ;
this.activity = new ActivityEntity().putActor(current).chainPut("title", activityTitle).chainPut("content", activityDesc).chainPut("category", "testCategory").chainPut("verb", "POST");
this.groupActivityResource = groupsResource.entity(entity).activities();
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
}
@@ -87,7 +87,7 @@ public class ActivityResourceIT extends AbstractRestIT {
{
throw e;
}
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
Collection results = groupActivityResource.get();
@@ -111,7 +111,7 @@ public class ActivityResourceIT extends AbstractRestIT {
usersResource.entity(current).activities().post(activity);
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
Collection results = usersResource.entity(current).activities().get();
@@ -136,7 +136,7 @@ public class ActivityResourceIT extends AbstractRestIT {
this.app().collection("activities").post(activity);
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
Collection results = this.app().collection("activities").get();
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/activities/PutTest.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/activities/PutTest.java b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/activities/PutTest.java
index d61d363..4ba8977 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/activities/PutTest.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/activities/PutTest.java
@@ -17,7 +17,6 @@
package org.apache.usergrid.rest.applications.collection.activities;
-import com.fasterxml.jackson.databind.JsonNode;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
@@ -28,7 +27,6 @@ import org.apache.usergrid.rest.test.resource.AbstractRestIT;
import org.apache.usergrid.rest.test.resource.endpoints.CollectionEndpoint;
import org.apache.usergrid.rest.test.resource.model.Collection;
import org.apache.usergrid.rest.test.resource.model.Entity;
-import org.junit.Rule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -60,7 +58,7 @@ public class PutTest extends AbstractRestIT {
Entity activity = activities.post(new Entity(props));
}
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
String query = "select * ";
@@ -72,7 +70,7 @@ public class PutTest extends AbstractRestIT {
props.put( "actor", newActor );
Entity activity = activities.post(new Entity(props));
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
collection = activities.get( );
assertEquals( 6, collection.getResponse().getEntities().size() );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/devices/DevicesResourceIT.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/devices/DevicesResourceIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/devices/DevicesResourceIT.java
index 67cf19f..b73bcbd 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/devices/DevicesResourceIT.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/devices/DevicesResourceIT.java
@@ -17,11 +17,8 @@
package org.apache.usergrid.rest.applications.collection.devices;
-import java.util.HashMap;
-import java.util.Map;
import java.util.UUID;
-import com.fasterxml.jackson.databind.JsonNode;
import org.apache.usergrid.persistence.model.util.UUIDGenerator;
import org.apache.usergrid.rest.test.resource.AbstractRestIT;
import org.apache.usergrid.rest.test.resource.endpoints.CollectionEndpoint;
@@ -35,7 +32,6 @@ import java.io.IOException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
-import org.junit.Ignore;
import javax.ws.rs.ClientErrorException;
@@ -51,7 +47,7 @@ public class DevicesResourceIT extends AbstractRestIT {
CollectionEndpoint devicesResource =this.app().collection("devices");
Entity entity = devicesResource.entity(uuid).put(payload);
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
// create
assertNotNull( entity );
@@ -62,7 +58,7 @@ public class DevicesResourceIT extends AbstractRestIT {
ApiResponse deleteResponse =devicesResource.entity(uuid).delete();
assertNotNull(deleteResponse.getEntities().get(0));
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
// check deleted
try {
@@ -72,7 +68,7 @@ public class DevicesResourceIT extends AbstractRestIT {
catch ( ClientErrorException e ) {
assertEquals( 404, e.getResponse().getStatus() );
}
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
// create again
entity = devicesResource.entity(uuid).put(payload);
@@ -80,7 +76,7 @@ public class DevicesResourceIT extends AbstractRestIT {
assertNotNull( entity );
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
// check existence
entity = devicesResource.entity(uuid).get();
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/groups/GroupResourceIT.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/groups/GroupResourceIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/groups/GroupResourceIT.java
index 769852d..b94050b 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/groups/GroupResourceIT.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/groups/GroupResourceIT.java
@@ -21,7 +21,6 @@ import com.fasterxml.jackson.databind.JsonNode;
import org.apache.usergrid.rest.test.resource.AbstractRestIT;
import org.apache.usergrid.rest.test.resource.model.Collection;
import org.apache.usergrid.rest.test.resource.model.Entity;
-import org.junit.Ignore;
import org.junit.Test;
import javax.ws.rs.ClientErrorException;
@@ -58,7 +57,7 @@ public class GroupResourceIT extends AbstractRestIT {
Entity entity = this.app().collection("groups").post(payload);
assertEquals(entity.get("name"), groupName);
assertEquals(entity.get("path"), groupPath);
- this.refreshIndex();
+ this.waitForQueueDrainAndRefreshIndex();
return entity;
}
@@ -74,7 +73,7 @@ public class GroupResourceIT extends AbstractRestIT {
Entity entity = this.app().collection("roles").post(payload);
assertEquals(entity.get("name"), roleName);
assertEquals(entity.get("title"), roleTitle);
- this.refreshIndex();
+ this.waitForQueueDrainAndRefreshIndex();
return entity;
}
@@ -91,7 +90,7 @@ public class GroupResourceIT extends AbstractRestIT {
Entity entity = this.app().collection("users").post(payload);
assertEquals(entity.get("username"), username);
assertEquals(entity.get("email"), email);
- this.refreshIndex();
+ this.waitForQueueDrainAndRefreshIndex();
return entity;
}
@@ -180,7 +179,7 @@ public class GroupResourceIT extends AbstractRestIT {
group.put("path", newGroupPath);
Entity groupResponse = this.app().collection("groups").entity(group).put(group);
assertEquals(groupResponse.get("path"), newGroupPath);
- this.refreshIndex();
+ this.waitForQueueDrainAndRefreshIndex();
//4. do a GET to verify the property really was set
groupResponseGET = this.app().collection("groups").entity(group).get();
@@ -223,7 +222,7 @@ public class GroupResourceIT extends AbstractRestIT {
// 3. add the user to the group
Entity response = this.app().collection("users").entity(user).connection().collection("groups").entity(group).post();
assertEquals(response.get("name"), groupName);
- this.refreshIndex();
+ this.waitForQueueDrainAndRefreshIndex();
// 4. make sure the user is in the group
Collection collection = this.app().collection("groups").entity(group).connection().collection("users").get();
@@ -237,7 +236,7 @@ public class GroupResourceIT extends AbstractRestIT {
//6. remove the user from the group
this.app().collection("group").entity(group).connection().collection("users").entity(user).delete();
- this.refreshIndex();
+ this.waitForQueueDrainAndRefreshIndex();
//6. make sure the connection no longer exists
collection = this.app().collection("group").entity(group).connection().collection("users").get();
@@ -266,12 +265,12 @@ public class GroupResourceIT extends AbstractRestIT {
String roleName = "tester";
String roleTitle = "tester";
Entity role = this.createRole(roleName, roleTitle);
- this.refreshIndex();
+ this.waitForQueueDrainAndRefreshIndex();
//3. add role to the group
Entity response = this.app().collection("roles").entity(role).connection().collection("groups").entity(group).post();
assertEquals(response.get("name"), groupName);
- this.refreshIndex();
+ this.waitForQueueDrainAndRefreshIndex();
//4. make sure the role is in the group
Collection collection = this.app().collection("groups").entity(group).connection().collection("roles").get();
@@ -280,7 +279,7 @@ public class GroupResourceIT extends AbstractRestIT {
//5. remove Role from the group (should only delete the connection)
this.app().collection("groups").entity(group).connection().collection("roles").entity(role).delete();
- this.refreshIndex();
+ this.waitForQueueDrainAndRefreshIndex();
//6. make sure the connection no longer exists
collection = this.app().collection("groups").entity(group).connection().collection("roles").get();
@@ -294,7 +293,7 @@ public class GroupResourceIT extends AbstractRestIT {
//8. delete the role
this.app().collection("role").entity(role).delete();
- this.refreshIndex();
+ this.waitForQueueDrainAndRefreshIndex();
Thread.sleep(5000);
//9. do a GET to make sure the role was deleted
@@ -359,7 +358,7 @@ public class GroupResourceIT extends AbstractRestIT {
payload.put("name", catName);
Entity fluffy = this.app().collection("cats").post(payload);
assertEquals(fluffy.get("name"), catName);
- this.refreshIndex();
+ this.waitForQueueDrainAndRefreshIndex();
//10. get the cat - permissions should allow this
fluffy = this.app().collection("cats").uniqueID(catName).get();
@@ -436,7 +435,7 @@ public class GroupResourceIT extends AbstractRestIT {
//7. get all the users in the groups
- this.refreshIndex();
+ this.waitForQueueDrainAndRefreshIndex();
Collection usersInGroup = this.app().collection("groups").uniqueID(groupName).connection("users").get();
assertEquals(usersInGroup.getResponse().getEntityCount(), 2);
@@ -444,7 +443,7 @@ public class GroupResourceIT extends AbstractRestIT {
this.app().collection("role").uniqueID("Default").delete();
Entity data = new Entity().chainPut("name", "group1role");
this.app().collection("roles").post(data);
- this.refreshIndex();
+ this.waitForQueueDrainAndRefreshIndex();
Entity perms = new Entity();
String permission = "get,post,put,delete:/groups/" + group.getUuid() + "/**";
@@ -452,18 +451,18 @@ public class GroupResourceIT extends AbstractRestIT {
this.app().collection("roles").uniqueID("group1role").connection("permissions").post(perms);
this.app().collection("roles").uniqueID("group1role").connection("users").uniqueID( user1Username ).post();
this.app().collection("roles").uniqueID("group1role").connection("users").uniqueID( user2Username ).post();
- this.refreshIndex();
+ this.waitForQueueDrainAndRefreshIndex();
//7b. everybody gets access to /activities
perms = new Entity();
permission = "get:/activities/**";
perms.put("permission",permission);
this.app().collection("roles").uniqueID("Guest").connection("permissions").post(perms);
- this.refreshIndex();
+ this.waitForQueueDrainAndRefreshIndex();
this.app().collection("roles").uniqueID("Guest").connection("users").uniqueID( user1Username ).post();
this.app().collection("roles").uniqueID("Guest").connection("users").uniqueID( user2Username ).post();
this.app().collection("roles").uniqueID("Guest").connection("users").uniqueID( user3Username ).post();
- this.refreshIndex();
+ this.waitForQueueDrainAndRefreshIndex();
//8. post an activity to the group
@@ -482,7 +481,7 @@ public class GroupResourceIT extends AbstractRestIT {
Entity activityResponse = this.app().collection("groups")
.uniqueID(groupName).connection("activities").post(activity);
assertEquals(activityResponse.get("content"), content);
- this.refreshIndex();
+ this.waitForQueueDrainAndRefreshIndex();
//11. log user1 in, should then be using the app user's token not the admin token
this.getAppUserToken(user1Username, password);
@@ -565,7 +564,7 @@ public class GroupResourceIT extends AbstractRestIT {
group.put("title", newTitle);
Entity groupResponse = this.app().collection("groups").entity(group).put(group);
assertEquals(groupResponse.get("title"), newTitle);
- this.refreshIndex();
+ this.waitForQueueDrainAndRefreshIndex();
// update that group by giving it a new title and using UUID in URL
String evenNewerTitle = "Even New Title";
@@ -573,6 +572,6 @@ public class GroupResourceIT extends AbstractRestIT {
String uuid = group.getAsString("uuid");
groupResponse = this.app().collection("groups").uniqueID(uuid).put(group);
assertEquals(groupResponse.get("title"), evenNewerTitle);
- this.refreshIndex();
+ this.waitForQueueDrainAndRefreshIndex();
}
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/paging/PagingResourceIT.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/paging/PagingResourceIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/paging/PagingResourceIT.java
index 4ca46b1..da15b2e 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/paging/PagingResourceIT.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/paging/PagingResourceIT.java
@@ -130,7 +130,7 @@ public class PagingResourceIT extends AbstractRestIT {
ApiResponse response = this.app().collection( collectionName ).delete( queryParameters );
- this.refreshIndex();
+ this.waitForQueueDrainAndRefreshIndex();
if(validate)
assertEquals("Entities should have been deleted", deletePageSize,response.getEntityCount() );
@@ -268,7 +268,7 @@ public class PagingResourceIT extends AbstractRestIT {
entityPayload.put( "name", created );
Entity entity = new Entity( entityPayload );
entity = this.app().collection( collectionName ).post( entity );
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
if(created == 1){
connectedEntity = entity;
}
@@ -277,7 +277,7 @@ public class PagingResourceIT extends AbstractRestIT {
}
}
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
QueryParameters qp = new QueryParameters();
qp.setQuery("select * order by created asc");
@@ -323,7 +323,7 @@ public class PagingResourceIT extends AbstractRestIT {
this.app().collection( collectionName ).post( entity );
}
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
//Creates query looking for entities with the very stop.
String query = "select * where verb = 'stop'";
@@ -454,7 +454,7 @@ public class PagingResourceIT extends AbstractRestIT {
}
}
- this.refreshIndex();
+ this.waitForQueueDrainAndRefreshIndex();
return entities;
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/users/ConnectionResourceTest.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/users/ConnectionResourceTest.java b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/users/ConnectionResourceTest.java
index 6202c6a..7315cad 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/users/ConnectionResourceTest.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/users/ConnectionResourceTest.java
@@ -63,7 +63,7 @@ public class ConnectionResourceTest extends AbstractRestIT {
Entity objectOfDesire = new Entity();
objectOfDesire.put( "codingmunchies", "doritoes" );
objectOfDesire = this.app().collection( "snacks" ).post( objectOfDesire );
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
Entity toddWant = this.app().collection( "users" ).entity( todd ).collection( "likes" ).collection( "snacks" )
.entity( objectOfDesire ).post();
@@ -93,11 +93,11 @@ public class ConnectionResourceTest extends AbstractRestIT {
thing2.put( "name", "thing2" );
thing2 = this.app().collection( "things" ).post( thing2 );
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
//create the connection: thing1 likes thing2
this.app().collection( "things" ).entity( thing1 )
.connection("likes").collection( "things" ).entity( thing2 ).post();
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
//test we have the "likes" in our connection meta data response
thing1 = this.app().collection( "things" ).entity( thing1 ).get();
@@ -150,14 +150,14 @@ public class ConnectionResourceTest extends AbstractRestIT {
thing2.put( "name", "thing2" );
thing2 = this.app().collection( "things" ).post( thing2 );
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
//create the connection: thing1 likes thing2
this.app().collection( "things" ).entity( thing1 )
.connection("likes").collection( "things" ).entity( thing2 ).post();
//delete thing2
this.app().collection( "things" ).entity( thing2 ).delete();
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
try {
//attempt to retrieve thing1
@@ -185,14 +185,14 @@ public class ConnectionResourceTest extends AbstractRestIT {
thing2.put( "name", "thing2" );
thing2 = this.app().collection( "things" ).post( thing2 );
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
//create the connection: thing1 likes thing2
this.app().collection( "things" ).entity( thing1 )
.connection("likes").collection( "things" ).entity( thing2 ).post();
//delete thing1
this.app().collection( "things" ).entity( thing1 ).delete();
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
try {
//attempt to retrieve thing1
@@ -236,7 +236,7 @@ public class ConnectionResourceTest extends AbstractRestIT {
//connect thing1 -> thing3
connectionEndpoint.entity( thing3 ).post();
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
//now do a GET, we should see thing2 then thing3
@@ -257,7 +257,7 @@ public class ConnectionResourceTest extends AbstractRestIT {
//now re-post thing 2 it should appear second
connectionEndpoint.entity( thing2 ).post();
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
final ApiResponse order2 = connectionEndpoint.get().getResponse();
@@ -304,7 +304,7 @@ public class ConnectionResourceTest extends AbstractRestIT {
//connect thing1 -> thing3
connectionEndpoint.entity( thing3 ).put( thing3 );
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
//now do a GET, we should see thing2 then thing3
@@ -325,7 +325,7 @@ public class ConnectionResourceTest extends AbstractRestIT {
//now re-post thing 2 it should appear second
connectionEndpoint.entity( thing2 ).put( thing2 );
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
final ApiResponse order2 = connectionEndpoint.get().getResponse();
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/users/OwnershipResourceIT.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/users/OwnershipResourceIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/users/OwnershipResourceIT.java
index cfd08e5..3393582 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/users/OwnershipResourceIT.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/users/OwnershipResourceIT.java
@@ -17,19 +17,15 @@
package org.apache.usergrid.rest.applications.collection.users;
-import com.fasterxml.jackson.databind.JsonNode;
import java.io.IOException;
import org.apache.usergrid.rest.test.resource.AbstractRestIT;
import org.apache.usergrid.rest.test.resource.endpoints.CollectionEndpoint;
import org.apache.usergrid.rest.test.resource.model.*;
import org.junit.Before;
-import org.junit.Rule;
import org.junit.Test;
-import org.apache.usergrid.utils.MapUtils;
-
import javax.ws.rs.ClientErrorException;
import static org.junit.Assert.assertEquals;
@@ -65,7 +61,7 @@ public class OwnershipResourceIT extends AbstractRestIT {
user1 = new User(this.usersResource.post(user1));
user2 = new User(this.usersResource.post(user2));
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
}
@@ -95,7 +91,7 @@ public class OwnershipResourceIT extends AbstractRestIT {
//Revoke the user1 token
usersResource.entity(user1).connection("revoketokens").post(new Entity().chainPut("token", token.getAccessToken()));
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
//See if we can still access the me entity after revoking its token
try {
@@ -127,7 +123,7 @@ public class OwnershipResourceIT extends AbstractRestIT {
// create device 1 on user1 devices
usersResource.entity("me").collection("devices")
.post(new Entity( ).chainPut("name", "device1").chainPut("number", "5551112222"));
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
//Clear the current user token
this.app().token().clearToken();
@@ -137,7 +133,7 @@ public class OwnershipResourceIT extends AbstractRestIT {
usersResource.entity("me").collection("devices")
.post(new Entity( ).chainPut("name", "device2").chainPut("number", "5552223333"));
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
//Check that we can get back device1 on user1
token = this.app().token().post(new Token(user1.getUsername(),"password"));
@@ -236,13 +232,13 @@ public class OwnershipResourceIT extends AbstractRestIT {
// create a 4peaks restaurant
Entity data = this.app().collection("restaurants").post(new Entity().chainPut("name", "4peaks"));
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
//Create a restaurant and link it to user1/me
Entity fourPeaksData = usersResource.entity("me")
.connection("likes").collection( "restaurants" ).entity( "4peaks" ).post();
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
// anonymous user
this.app().token().clearToken();
@@ -252,11 +248,11 @@ public class OwnershipResourceIT extends AbstractRestIT {
data = this.app().collection("restaurants")
.post(new Entity().chainPut("name", "arrogantbutcher"));
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
data = usersResource.entity("me").connection( "likes" ).collection( "restaurants" )
.entity( "arrogantbutcher" ).post();
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
String arrogantButcherId = data.getUuid().toString();
@@ -390,7 +386,7 @@ public class OwnershipResourceIT extends AbstractRestIT {
//Sets up the cities collection with the city tempe
Entity city = this.app().collection("cities").post(new Entity().chainPut("name", "tempe"));
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
// create a 4peaks restaurant that is connected by a like to tempe.
Entity data = this.app().collection("cities").entity( "tempe" ).connection( "likes" )
@@ -410,7 +406,7 @@ public class OwnershipResourceIT extends AbstractRestIT {
CollectionEndpoint likeRestaurants =
this.app().collection("cities").entity( "tempe" ).connection( "likes" );
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
// check we can get the resturant entities back via uuid without a collection name
data = likeRestaurants.entity( peaksId ).get();
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/users/PermissionsResourceIT.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/users/PermissionsResourceIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/users/PermissionsResourceIT.java
index aff952b..2dddcf6 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/users/PermissionsResourceIT.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/users/PermissionsResourceIT.java
@@ -66,7 +66,7 @@ public class PermissionsResourceIT extends AbstractRestIT {
user = new User(USER,USER,USER+"@apigee.com","password");
user = new User( this.app().collection("users").post(user));
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
}
@@ -86,13 +86,13 @@ public class PermissionsResourceIT extends AbstractRestIT {
assertEquals( ROLE, node.get("name").toString() );
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
//Post the user with a specific role into the users collection
node = this.app().collection("roles").entity(node).collection("users").entity(USER).post();
assertNull( node.get( "error" ) );
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
// now check the user has the role
node = this.app().collection("users").entity(USER).collection("roles").entity(ROLE).get();
@@ -104,7 +104,7 @@ public class PermissionsResourceIT extends AbstractRestIT {
// now delete the role
this.app().collection("users").entity(USER).collection("roles").entity(ROLE).delete();
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
// check if the role was deleted
@@ -136,14 +136,14 @@ public class PermissionsResourceIT extends AbstractRestIT {
assertNull( node.get( "error" ) );
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
//Create a user that is in the group.
node = this.app().collection("groups").entity(groupPath).collection("users").entity(user).post();
assertNull( node.get( "error" ) );
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
//Get the user and make sure that they are part of the group
Collection groups = this.app().collection("users").entity(user).collection("groups").get();
@@ -157,7 +157,7 @@ public class PermissionsResourceIT extends AbstractRestIT {
assertNull( response.getError() );
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
//Check that the user no longer exists in the group
int status = 0;
@@ -193,7 +193,7 @@ public class PermissionsResourceIT extends AbstractRestIT {
assertNull( entity.getError() );
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
// now try to add permission as the user, this should work
addPermission( "usercreatedrole", "get,put,post:/foo/**" );
@@ -247,13 +247,13 @@ public class PermissionsResourceIT extends AbstractRestIT {
assertNull( node.getError() );
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
// delete the default role to test permissions later
ApiResponse response = this.app().collection("roles").entity("default").delete();
assertNull( response.getError() );
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
// Grants a permission to GET, POST, and PUT the reviews url for the reviewer role
addPermission( "reviewer", "get,put,post:/reviews/**" );
@@ -266,22 +266,22 @@ public class PermissionsResourceIT extends AbstractRestIT {
this.app().collection("groups").post(group);
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
// Adds the reviewer to the reviewerGroup
this.app().collection("groups").entity("reviewergroup").collection("roles").entity("reviewer").post();
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
// Adds reviewer2 user to the reviewergroup
this.app().collection("users").entity("reviewer2").collection("groups").entity("reviewergroup").post();
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
// Adds reviewer1 to the reviewer role
this.app().collection("users").entity("reviewer1").collection("roles").entity("reviewer").post();
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
// Set the current context to reviewer1
this.app().token().post(new Token("reviewer1","password"));
@@ -295,7 +295,7 @@ public class PermissionsResourceIT extends AbstractRestIT {
.chainPut ("rating", "4").chainPut( "name", "4peaks").chainPut("review", "Huge beer selection" );
this.app().collection("reviews").post(review);
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
// get the reviews and assert they were created
QueryParameters queryParameters = new QueryParameters();
@@ -330,7 +330,7 @@ public class PermissionsResourceIT extends AbstractRestIT {
assertEquals( Response.Status.UNAUTHORIZED.getStatusCode(), status );
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
//TODO: maybe make this into two different tests?
@@ -346,7 +346,7 @@ public class PermissionsResourceIT extends AbstractRestIT {
.chainPut( "rating", "4" ).chainPut("name", "currycorner").chainPut( "review", "Authentic" );
this.app().collection("reviews").post(review);
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
// get all reviews as reviewer2
queryParameters = new QueryParameters();
@@ -372,7 +372,7 @@ public class PermissionsResourceIT extends AbstractRestIT {
assertEquals( Response.Status.UNAUTHORIZED.getStatusCode(), status );
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
status = 0;
@@ -409,7 +409,7 @@ public class PermissionsResourceIT extends AbstractRestIT {
Entity data = new Entity().chainPut("name", "reviewer");
this.app().collection("roles").post(data);
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
// allow access to reviews excluding delete
addPermission( "reviewer",
@@ -433,13 +433,13 @@ public class PermissionsResourceIT extends AbstractRestIT {
"wildcardpermusertwo@apigee.com" );
assertNotNull( userTwoId );
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
//Add user1 to the reviewer role
this.app().collection("users").entity(userOneId).collection("roles").entity("reviewer").post();
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
//Add a book to the books collection
Entity book = new Entity().chainPut( "title", "Ready Player One" ).chainPut("author", "Earnest Cline");
@@ -449,7 +449,7 @@ public class PermissionsResourceIT extends AbstractRestIT {
assertEquals( "Ready Player One", book.get("title").toString() );
String bookId = book.get("uuid").toString();
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
//Switch the contex to be that of user1
this.app().token().post(new Token("wildcardpermuserone","password"));
@@ -461,7 +461,7 @@ public class PermissionsResourceIT extends AbstractRestIT {
review = this.app().collection("reviews").post(review);
String reviewId = review.get("uuid").toString();
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
// POST https://api.usergrid.com/my-org/my-app/users/me/wrote/review/${reviewId}
this.app().collection("users").entity("me").connection("wrote").collection("review").entity(reviewId).post();
@@ -469,13 +469,13 @@ public class PermissionsResourceIT extends AbstractRestIT {
// POST https://api.usergrid.com/my-org/my-app/users/me/reviewed/review/${reviewId}
this.app().collection("users").entity("me").connection("reviewed").collection("books").entity(bookId).post();
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
// POST https://api.usergrid.com/my-org/my-app/books/${bookId}/review/${reviewId}
this.app().collection("books").entity(bookId).collection("review").entity(reviewId).post();
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
// now try to post the same thing to books to verify as userOne does not have correct permissions
int status = 0;
@@ -522,7 +522,7 @@ public class PermissionsResourceIT extends AbstractRestIT {
//allow patients to add doctors as their followers
addPermission( "patient", "delete,post:/users/*/following/users/${user}" );
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
// create examplepatient
UUID patientId = createRoleUser( "examplepatient", "examplepatient@apigee.com" );
@@ -531,12 +531,12 @@ public class PermissionsResourceIT extends AbstractRestIT {
// create exampledoctor
UUID doctorId = createRoleUser( "exampledoctor", "exampledoctor@apigee.com" );
assertNotNull( doctorId );
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
// assign examplepatient the patient role
this.app().collection("users").entity(patientId).collection("roles").entity("patient").post();
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
this.app().token().post(new Token("examplepatient","password"));
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
//not working yet, used to be ignored
// this.app().collection("users").entity("exampledoctor").connection("following")
// .collection("users").entity("examplepatient").post();
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/users/RetrieveUsersTest.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/users/RetrieveUsersTest.java b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/users/RetrieveUsersTest.java
index d5f7163..ddb1557 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/users/RetrieveUsersTest.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/users/RetrieveUsersTest.java
@@ -17,19 +17,15 @@
package org.apache.usergrid.rest.applications.collection.users;
-import java.util.HashMap;
import java.util.Map;
-import com.fasterxml.jackson.databind.JsonNode;
import java.io.IOException;
import org.apache.usergrid.rest.test.resource.AbstractRestIT;
import org.apache.usergrid.rest.test.resource.endpoints.CollectionEndpoint;
-import org.apache.usergrid.rest.test.resource.endpoints.EntityEndpoint;
import org.apache.usergrid.rest.test.resource.model.Entity;
import org.apache.usergrid.rest.test.resource.model.QueryParameters;
import org.junit.Assert;
-import org.junit.Rule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,7 +51,7 @@ public class RetrieveUsersTest extends AbstractRestIT {
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
String query = "select *";
String incorrectQuery = "select * where username = 'Alica'";
@@ -72,7 +68,7 @@ public class RetrieveUsersTest extends AbstractRestIT {
props.put( "username", "Nina" );
Entity entity = users.post(props);
- refreshIndex();
+ waitForQueueDrainAndRefreshIndex();
Map<String,Object> metadata = (Map)entity.get( "metadata" );
Map<String,Object> sets = (Map)metadata.get( "sets" );