You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by mr...@apache.org on 2016/02/25 20:56:30 UTC

[01/10] usergrid git commit: Added a test to try to prove the issue with collections that you create.

Repository: usergrid
Updated Branches:
  refs/heads/release-2.1.1 3ea2b3c04 -> ee356ec75


Added a test to try to prove the issue with collections that you create.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/12d344ff
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/12d344ff
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/12d344ff

Branch: refs/heads/release-2.1.1
Commit: 12d344ff44cee242f448e3738869aa21a9dfb3d5
Parents: 93b864f
Author: George Reyes <gr...@apache.org>
Authored: Thu Feb 18 14:48:36 2016 -0800
Committer: George Reyes <gr...@apache.org>
Committed: Thu Feb 18 14:48:36 2016 -0800

----------------------------------------------------------------------
 .../corepersistence/CpEntityManager.java        | 26 ++++++++++-
 .../applications/ApplicationResource.java       | 27 +++++++----
 .../rest/applications/ApplicationDeleteIT.java  | 12 ++---
 .../collection/CollectionsResourceIT.java       | 49 ++++++++++++++++++++
 4 files changed, 97 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/12d344ff/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index 6c2ef0b..d2f549b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -745,7 +745,31 @@ public class CpEntityManager implements EntityManager {
     @Override
     public Set<String> getApplicationCollections() throws Exception {
 
-        return getRelationManager( getApplication() ).getCollections();
+        Set<String> existingCollections = getRelationManager( getApplication() ).getCollections();
+
+        Set<String> dynamic_collections = cast( getDictionaryAsSet( getApplicationRef(), Schema.DICTIONARY_COLLECTIONS ) );
+        if ( dynamic_collections != null ) {
+            for ( String collection : dynamic_collections ) {
+                if ( !Schema.isAssociatedEntityType( collection ) ) {
+                    if(!existingCollections.contains( collection )) {
+                        existingCollections.add( collection );
+                    }
+                }
+            }
+        }
+        Set<String> system_collections = Schema.getDefaultSchema().getCollectionNames( Application.ENTITY_TYPE );
+        if ( system_collections != null ) {
+            for ( String collection : system_collections ) {
+                if ( !Schema.isAssociatedEntityType( collection ) ) {
+                    if(!existingCollections.contains( collection )) {
+                        existingCollections.add( collection );
+                    }
+                }
+            }
+        }
+
+        return existingCollections;
+
     }
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/12d344ff/stack/rest/src/main/java/org/apache/usergrid/rest/management/organizations/applications/ApplicationResource.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/management/organizations/applications/ApplicationResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/management/organizations/applications/ApplicationResource.java
index fd137a0..61a7e0a 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/management/organizations/applications/ApplicationResource.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/management/organizations/applications/ApplicationResource.java
@@ -482,20 +482,27 @@ public class ApplicationResource extends AbstractContextResource {
     @Produces({MediaType.APPLICATION_JSON, "application/javascript"})
     public ApiResponse executeDelete(  @Context UriInfo ui,
         @QueryParam("callback") @DefaultValue("callback") String callback,
-        @QueryParam("app_delete_confirm") String confirmDelete) throws Exception {
+        @QueryParam("application_identifier") String applicationConfirmedDelete) throws Exception {
 
-        if (!"confirm_delete_of_application_and_data".equals( confirmDelete ) ) {
+        //If the path uses name then expect name, otherwise if they use uuid then expect uuid.
+        if(application==null){
+            if(!applicationId.toString().equals( applicationConfirmedDelete )){
+                throw new IllegalArgumentException(
+                    "Cannot delete application without supplying correct application id.");
+            }
+        }
+        else if (!application.getName().equals( applicationConfirmedDelete ) ) {
             throw new IllegalArgumentException(
-                "Cannot delete application without app_delete_confirm parameter");
+                "Cannot delete application without supplying correct application name");
         }
 
-        Properties props = management.getProperties();
-
-        // for now, only works in test mode
-        String testProp = ( String ) props.get( "usergrid.test" );
-        if ( testProp == null || !Boolean.parseBoolean( testProp ) ) {
-            throw new UnsupportedRestOperationException("Test props not not functioning correctly.");
-        }
+//        Properties props = management.getProperties();
+//
+//         //for now, only works in test mode
+//        String testProp = ( String ) props.get( "usergrid.test" );
+//        if ( testProp == null || !Boolean.parseBoolean( testProp ) ) {
+//            throw new UnsupportedRestOperationException("Test props not not functioning correctly.");
+//        }
 
         if ( applicationId == null ) {
             throw new IllegalArgumentException("Application ID not specified in request");

http://git-wip-us.apache.org/repos/asf/usergrid/blob/12d344ff/stack/rest/src/test/java/org/apache/usergrid/rest/applications/ApplicationDeleteIT.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/ApplicationDeleteIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/ApplicationDeleteIT.java
index d215256..7b4751d 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/ApplicationDeleteIT.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/ApplicationDeleteIT.java
@@ -84,7 +84,7 @@ public class ApplicationDeleteIT extends AbstractRestIT {
         clientSetup.getRestClient().management().orgs()
             .org(orgName).apps().app(appToDeleteId.toString() ).getTarget()
             .queryParam("access_token", orgAdminToken.getAccessToken() )
-            .queryParam("app_delete_confirm", "confirm_delete_of_application_and_data")
+            .queryParam("application_identifier", appToDeleteId)
             .request()
             .delete();
 
@@ -175,7 +175,7 @@ public class ApplicationDeleteIT extends AbstractRestIT {
         final Response response1 = clientSetup.getRestClient().management()
             .orgs().org( orgName ).apps().app( appToDeleteId.toString() )
             .getTarget().queryParam( "access_token", orgAdminToken.getAccessToken() )
-            .queryParam( "app_delete_confirm", "confirm_delete_of_application_and_data" )
+            .queryParam( "application_identifier", appToDeleteId )
             .request()
             .delete();
         Assert.assertEquals( "Error must be 404", 404, response1.getStatus() );
@@ -223,7 +223,7 @@ public class ApplicationDeleteIT extends AbstractRestIT {
         clientSetup.getRestClient().management().orgs()
             .org( orgName ).apps().app( appToDeleteName ).getTarget()
             .queryParam( "access_token", orgAdminToken.getAccessToken() )
-            .queryParam("app_delete_confirm", "confirm_delete_of_application_and_data")
+            .queryParam("application_identifier", appToDeleteName)
             .request()
             .delete();
 
@@ -303,7 +303,7 @@ public class ApplicationDeleteIT extends AbstractRestIT {
         clientSetup.getRestClient().management().orgs()
             .org( orgName ).apps().app( appToDeleteId.toString() ).getTarget()
             .queryParam( "access_token", orgAdminToken.getAccessToken() )
-            .queryParam("app_delete_confirm", "confirm_delete_of_application_and_data")
+            .queryParam("application_identifier", appToDeleteId)
             .request()
             .delete();
 
@@ -345,7 +345,7 @@ public class ApplicationDeleteIT extends AbstractRestIT {
         clientSetup.getRestClient().management()
             .orgs().org( orgName ).apps().app( appToDeleteId.toString() ).getTarget()
             .queryParam( "access_token", orgAdminToken.getAccessToken() )
-            .queryParam( "app_delete_confirm", "confirm_delete_of_application_and_data" )
+            .queryParam( "application_identifier", appToDeleteId )
             .request()
             .delete();
 
@@ -358,7 +358,7 @@ public class ApplicationDeleteIT extends AbstractRestIT {
         final Response response = clientSetup.getRestClient().management()
             .orgs().org( orgName ).apps().app( newAppId.toString() ).getTarget()
             .queryParam( "access_token", orgAdminToken.getAccessToken() )
-            .queryParam( "app_delete_confirm", "confirm_delete_of_application_and_data" )
+            .queryParam( "application_identifier", newAppId )
             .request()
             .delete();
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/12d344ff/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/CollectionsResourceIT.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/CollectionsResourceIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/CollectionsResourceIT.java
index 48daba4..66c94ce 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/CollectionsResourceIT.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/CollectionsResourceIT.java
@@ -18,7 +18,11 @@ package org.apache.usergrid.rest.applications.collection;
 
 
 import com.fasterxml.jackson.databind.JsonNode;
+
+import org.apache.usergrid.persistence.Schema;
+import org.apache.usergrid.persistence.entities.Application;
 import org.apache.usergrid.rest.test.resource.AbstractRestIT;
+import org.apache.usergrid.rest.test.resource.model.ApiResponse;
 import org.apache.usergrid.rest.test.resource.model.Collection;
 import org.apache.usergrid.rest.test.resource.model.Entity;
 import org.apache.usergrid.rest.test.resource.model.QueryParameters;
@@ -30,6 +34,10 @@ import org.slf4j.LoggerFactory;
 
 import javax.ws.rs.ClientErrorException;
 import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
 
 import static org.junit.Assert.*;
 
@@ -226,6 +234,47 @@ public class CollectionsResourceIT extends AbstractRestIT {
 
     }
 
+    @Test
+    public void testDefaultCollectionReturning() throws IOException {
+
+        ApiResponse usersDefaultCollection = this.app().get();
+
+        LinkedHashMap collectionHashMap = ( LinkedHashMap ) usersDefaultCollection.getEntity().get( "metadata" );
+
+        //make sure you have all the other default collections once you have users in place.
+        Set<String> system_collections = Schema.getDefaultSchema().getCollectionNames( Application.ENTITY_TYPE );
+        for(String collectionName : system_collections){
+            assertNotSame( null,((LinkedHashMap)(collectionHashMap.get( "collections" ))).get( collectionName ));
+        }
+    }
+
+    @Test
+    public void testNewlyCreatedCollectionReturnWhenEmpty(){
+        String collectionName =  "testDefaultCollectionReturnings";
+
+        Map<String,Object> payload = new HashMap(  );
+        payload.put( "hello","test" );
+        ApiResponse testEntity = this.app().collection( collectionName ).post( payload );
+
+        //Verify that the below collection actually does exist.
+        ApiResponse usersDefaultCollection = this.app().get();
+
+        LinkedHashMap collectionHashMap = ( LinkedHashMap ) usersDefaultCollection.getEntity().get( "metadata" );
+
+        assertNotSame( null,((LinkedHashMap)(collectionHashMap.get( "collections" ))).get( collectionName.toLowerCase() ));
+
+        this.refreshIndex();
+        this.app().collection( collectionName ).entity( testEntity.getEntity().getUuid() ).delete();
+
+
+        //Verify that the collection still exists despite deleting its only entity.)
+        usersDefaultCollection = this.app().get();
+
+        collectionHashMap = ( LinkedHashMap ) usersDefaultCollection.getEntity().get( "metadata" );
+
+        assertNotSame( null,((LinkedHashMap)(collectionHashMap.get( "collections" ))).get( collectionName ));
+    }
+
 
     /**
      * Test to verify "name property returns twice in AppServices response" is fixed.


[10/10] usergrid git commit: Merge commit 'refs/pull/487/head' of github.com:apache/usergrid into release-2.1.1

Posted by mr...@apache.org.
Merge commit 'refs/pull/487/head' of github.com:apache/usergrid into release-2.1.1


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/ee356ec7
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/ee356ec7
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/ee356ec7

Branch: refs/heads/release-2.1.1
Commit: ee356ec75da3c72a0cd51a501de62b9bde3bcd58
Parents: 583c104 5dfbcca
Author: Michael Russo <mr...@apigee.com>
Authored: Thu Feb 25 11:55:42 2016 -0800
Committer: Michael Russo <mr...@apigee.com>
Committed: Thu Feb 25 11:55:42 2016 -0800

----------------------------------------------------------------------
 .../corepersistence/CpEntityManager.java        |   4 +-
 .../asyncevents/AmazonAsyncEventService.java    | 844 -------------------
 .../asyncevents/AsyncEventService.java          |   5 +-
 .../asyncevents/AsyncEventServiceImpl.java      | 829 ++++++++++++++++++
 .../asyncevents/AsyncIndexProvider.java         |   4 +-
 .../asyncevents/EventBuilder.java               |  12 +-
 .../asyncevents/EventBuilderImpl.java           |  15 +-
 .../asyncevents/IndexDocNotFoundException.java  |  37 +
 .../asyncevents/model/AsyncEvent.java           |   2 -
 .../asyncevents/model/EdgeIndexEvent.java       |  70 --
 .../asyncevents/model/EntityIndexEvent.java     |  54 --
 .../corepersistence/index/IndexServiceImpl.java |   4 +-
 .../read/search/CandidateEntityFilter.java      |  10 +-
 .../index/AmazonAsyncEventServiceTest.java      | 103 ---
 .../index/AsyncEventServiceImplTest.java        | 103 +++
 .../index/AsyncIndexServiceTest.java            |   3 +-
 .../index/impl/EsIndexProducerImpl.java         |   5 +-
 .../usergrid/persistence/queue/QueueFig.java    |   4 +
 .../usergrid/services/ServiceManager.java       |   5 +-
 19 files changed, 1003 insertions(+), 1110 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/ee356ec7/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------


[03/10] usergrid git commit: Reduce SQS hop for entity write/update indexing events.

Posted by mr...@apache.org.
http://git-wip-us.apache.org/repos/asf/usergrid/blob/b4634dc4/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
deleted file mode 100644
index 625a8fd..0000000
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.index;
-
-
-import org.apache.usergrid.corepersistence.asyncevents.EventBuilder;
-import org.apache.usergrid.persistence.index.EntityIndexFactory;
-import org.apache.usergrid.persistence.index.impl.IndexProducer;
-import org.apache.usergrid.persistence.queue.QueueFig;
-import org.junit.Rule;
-import org.junit.runner.RunWith;
-
-import org.apache.usergrid.corepersistence.TestIndexModule;
-import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
-import org.apache.usergrid.corepersistence.asyncevents.AmazonAsyncEventService;
-import org.apache.usergrid.persistence.core.aws.NoAWSCredsRule;
-import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
-import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
-import org.apache.usergrid.persistence.core.test.UseModules;
-import org.apache.usergrid.persistence.index.impl.EsRunner;
-import org.apache.usergrid.persistence.map.MapManagerFactory;
-import org.apache.usergrid.persistence.queue.QueueManagerFactory;
-
-import com.google.inject.Inject;
-
-import net.jcip.annotations.NotThreadSafe;
-
-import static org.apache.usergrid.persistence.core.util.IdGenerator.createId;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-
-@RunWith( EsRunner.class )
-@UseModules( { TestIndexModule.class } )
-@NotThreadSafe
-public class AmazonAsyncEventServiceTest extends AsyncIndexServiceTest {
-
-
-
-    @Rule
-    public NoAWSCredsRule noAwsCredsRule = new NoAWSCredsRule();
-
-
-
-    @Inject
-    public QueueManagerFactory queueManagerFactory;
-
-    @Inject
-    public IndexProcessorFig indexProcessorFig;
-
-    @Inject
-    public QueueFig queueFig;
-
-
-    @Inject
-    public MetricsFactory metricsFactory;
-
-    @Inject
-    public RxTaskScheduler rxTaskScheduler;
-
-    @Inject
-    public EventBuilder eventBuilder;
-
-    @Inject
-    public IndexProducer indexProducer;
-
-    @Inject
-    public IndexLocationStrategyFactory indexLocationStrategyFactory;
-
-    @Inject
-    public MapManagerFactory mapManagerFactory;
-
-
-    @Inject
-    public EntityIndexFactory entityIndexFactory;
-
-    @Override
-    protected AsyncEventService getAsyncEventService() {
-        return  new AmazonAsyncEventService( queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory,  entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory, eventBuilder, mapManagerFactory, queueFig,  rxTaskScheduler );
-    }
-
-
-
-
-
-}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/b4634dc4/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncEventServiceImplTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncEventServiceImplTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncEventServiceImplTest.java
new file mode 100644
index 0000000..c915464
--- /dev/null
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncEventServiceImplTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.index;
+
+
+import org.apache.usergrid.corepersistence.asyncevents.EventBuilder;
+import org.apache.usergrid.persistence.index.EntityIndexFactory;
+import org.apache.usergrid.persistence.index.impl.IndexProducer;
+import org.apache.usergrid.persistence.queue.QueueFig;
+import org.junit.Rule;
+import org.junit.runner.RunWith;
+
+import org.apache.usergrid.corepersistence.TestIndexModule;
+import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
+import org.apache.usergrid.corepersistence.asyncevents.AsyncEventServiceImpl;
+import org.apache.usergrid.persistence.core.aws.NoAWSCredsRule;
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
+import org.apache.usergrid.persistence.core.test.UseModules;
+import org.apache.usergrid.persistence.index.impl.EsRunner;
+import org.apache.usergrid.persistence.map.MapManagerFactory;
+import org.apache.usergrid.persistence.queue.QueueManagerFactory;
+
+import com.google.inject.Inject;
+
+import net.jcip.annotations.NotThreadSafe;
+
+import static org.apache.usergrid.persistence.core.util.IdGenerator.createId;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+
+@RunWith( EsRunner.class )
+@UseModules( { TestIndexModule.class } )
+@NotThreadSafe
+public class AsyncEventServiceImplTest extends AsyncIndexServiceTest {
+
+
+
+    @Rule
+    public NoAWSCredsRule noAwsCredsRule = new NoAWSCredsRule();
+
+
+
+    @Inject
+    public QueueManagerFactory queueManagerFactory;
+
+    @Inject
+    public IndexProcessorFig indexProcessorFig;
+
+    @Inject
+    public QueueFig queueFig;
+
+
+    @Inject
+    public MetricsFactory metricsFactory;
+
+    @Inject
+    public RxTaskScheduler rxTaskScheduler;
+
+    @Inject
+    public EventBuilder eventBuilder;
+
+    @Inject
+    public IndexProducer indexProducer;
+
+    @Inject
+    public IndexLocationStrategyFactory indexLocationStrategyFactory;
+
+    @Inject
+    public MapManagerFactory mapManagerFactory;
+
+
+    @Inject
+    public EntityIndexFactory entityIndexFactory;
+
+    @Override
+    protected AsyncEventService getAsyncEventService() {
+        return  new AsyncEventServiceImpl( queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory,  entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory, eventBuilder, mapManagerFactory, queueFig,  rxTaskScheduler );
+    }
+
+
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/b4634dc4/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java
index 74f9ce0..12a33cf 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java
@@ -55,7 +55,6 @@ import com.google.inject.Inject;
 import net.jcip.annotations.NotThreadSafe;
 
 import rx.Observable;
-import rx.schedulers.Schedulers;
 
 import static org.apache.usergrid.persistence.core.util.IdGenerator.createId;
 import static org.junit.Assert.assertEquals;
@@ -145,7 +144,7 @@ public abstract class AsyncIndexServiceTest {
 
 
         //queue up processing
-        asyncEventService.queueEntityIndexUpdate( applicationScope, testEntity );
+        asyncEventService.queueEntityIndexUpdate( applicationScope, testEntity, 0);
 
 
         final EntityIndex EntityIndex =

http://git-wip-us.apache.org/repos/asf/usergrid/blob/b4634dc4/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java
index 4782bea..62102b4 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java
@@ -19,6 +19,7 @@ package org.apache.usergrid.persistence.index.impl;
 
 
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import com.codahale.metrics.Histogram;
@@ -130,7 +131,9 @@ public class EsIndexProducerImpl implements IndexProducer {
         final Observable<BatchOperation> batchOps = Observable.merge(index, deIndex);
 
         //buffer into the max size we can send ES and fire them all off until we're completed
-        final Observable<BulkRequestBuilder> requests = batchOps.buffer(indexFig.getIndexBatchSize())
+        final Observable<BulkRequestBuilder> requests = batchOps
+            .buffer(indexFig.getIndexBatchSize(), 1000, TimeUnit.MILLISECONDS)
+
             //flatten the buffer into a single batch execution
             .flatMap(individualOps -> Observable.from(individualOps)
                 //collect them

http://git-wip-us.apache.org/repos/asf/usergrid/blob/b4634dc4/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java
index cdab3e0..88ad3ff 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java
@@ -82,4 +82,8 @@ public interface QueueFig extends GuicyFig {
     @Key( "usergrid.queue.visibilityTimeout" )
     @Default("5000") // 5 seconds
     int getVisibilityTimeout();
+
+    @Key( "usergrid.queue.localquorum.timeout")
+    @Default("3000") // 3 seconds
+    int getLocalQuorumTimeout();
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/b4634dc4/stack/services/src/main/java/org/apache/usergrid/services/ServiceManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/ServiceManager.java b/stack/services/src/main/java/org/apache/usergrid/services/ServiceManager.java
index c439b49..0507818 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/ServiceManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/ServiceManager.java
@@ -292,7 +292,10 @@ public class ServiceManager {
             }
         }
         catch ( ClassNotFoundException e1 ) {
-            logger.error("Could not load class", e1);
+            if(logger.isTraceEnabled()){
+                logger.trace("Could not find class", e1);
+            }
+
         }
         return null;
     }


[05/10] usergrid git commit: Ensure indexBatch works with the new model for indexing.

Posted by mr...@apache.org.
Ensure indexBatch works with the new model for indexing.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/3c399e79
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/3c399e79
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/3c399e79

Branch: refs/heads/release-2.1.1
Commit: 3c399e790e16609bdc4a76853fcbc5ad562e8979
Parents: b4634dc
Author: Michael Russo <mi...@gmail.com>
Authored: Fri Feb 19 23:06:30 2016 -0800
Committer: Michael Russo <mi...@gmail.com>
Committed: Fri Feb 19 23:06:30 2016 -0800

----------------------------------------------------------------------
 .../asyncevents/AsyncEventServiceImpl.java      | 68 +++++++++++++------
 .../asyncevents/model/AsyncEvent.java           |  2 -
 .../asyncevents/model/EdgeIndexEvent.java       | 70 --------------------
 .../asyncevents/model/EntityIndexEvent.java     | 54 ---------------
 4 files changed, 49 insertions(+), 145 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/3c399e79/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
index e101761..dac3651 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
@@ -36,7 +36,6 @@ import org.apache.usergrid.corepersistence.asyncevents.model.AsyncEvent;
 import org.apache.usergrid.corepersistence.asyncevents.model.EdgeDeleteEvent;
 import org.apache.usergrid.corepersistence.asyncevents.model.ElasticsearchIndexEvent;
 import org.apache.usergrid.corepersistence.asyncevents.model.EntityDeleteEvent;
-import org.apache.usergrid.corepersistence.asyncevents.model.EntityIndexEvent;
 import org.apache.usergrid.corepersistence.asyncevents.model.InitializeApplicationIndexEvent;
 import org.apache.usergrid.corepersistence.index.EntityIndexOperation;
 import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory;
@@ -298,12 +297,12 @@ public class AsyncEventServiceImpl implements AsyncEventService {
 
             } catch (ClassCastException cce) {
                 logger.error("Failed to deserialize message body", cce);
-                return new IndexEventResult(Optional.absent(), System.currentTimeMillis());
+                return new IndexEventResult(Optional.absent(), Optional.absent(), System.currentTimeMillis());
             }
 
             if (event == null) {
                 logger.error("AsyncEvent type or event is null!");
-                return new IndexEventResult(Optional.absent(), System.currentTimeMillis());
+                return new IndexEventResult(Optional.absent(), Optional.absent(), System.currentTimeMillis());
             }
 
             final AsyncEvent thisEvent = event;
@@ -312,6 +311,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
                 logger.debug("Processing {} event", event);
             }
 
+            IndexOperationMessage indexOperationMessage = null;
             try {
 
                 // deletes are 2-part, actual IO to delete data, then queue up a de-index
@@ -332,7 +332,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
                 // this is the main event that pulls the index doc from map persistence and hands to the index producer
                 else if (event instanceof ElasticsearchIndexEvent) {
 
-                    handleIndexOperation((ElasticsearchIndexEvent) event);
+                    indexOperationMessage = handleIndexOperation((ElasticsearchIndexEvent) event);
 
                 } else {
 
@@ -341,20 +341,20 @@ public class AsyncEventServiceImpl implements AsyncEventService {
 
 
                 //return type that can be indexed and ack'd later
-                return new IndexEventResult(Optional.of(message), thisEvent.getCreationTime());
+                return new IndexEventResult(Optional.fromNullable(indexOperationMessage), Optional.of(message), thisEvent.getCreationTime());
 
             } catch (IndexDocNotFoundException e){
 
                 // this exception is throw when we wait before trying quorum read on map persistence.
                 // return empty event result so the event's message doesn't get ack'd
                 logger.info(e.getMessage());
-                return new IndexEventResult(Optional.absent(), event.getCreationTime());
+                return new IndexEventResult(Optional.absent(), Optional.absent(), event.getCreationTime());
 
             } catch (Exception e) {
 
                 // if the event fails to process, log the message and return empty event result so it doesn't get ack'd
                 logger.error("Failed to process message: {} {}", message.getMessageId(), message.getStringBody(), e);
-                return new IndexEventResult(Optional.absent(), event.getCreationTime());
+                return new IndexEventResult(Optional.absent(), Optional.absent(), event.getCreationTime());
             }
         });
 
@@ -407,6 +407,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
     public void queueDeleteEdge(final ApplicationScope applicationScope,
                                 final Edge edge) {
 
+        // sent in region (not offerTopic) as the delete IO happens in-region, then queues a multi-region de-index op
         offer( new EdgeDeleteEvent( queueFig.getPrimaryRegion(), applicationScope, edge ) );
     }
 
@@ -471,7 +472,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
         offerTopic( elasticsearchIndexEvent );
     }
 
-    public void handleIndexOperation(final ElasticsearchIndexEvent elasticsearchIndexEvent){
+    public IndexOperationMessage handleIndexOperation(final ElasticsearchIndexEvent elasticsearchIndexEvent){
          Preconditions.checkNotNull( elasticsearchIndexEvent, "elasticsearchIndexEvent cannot be null" );
 
         final UUID messageId = elasticsearchIndexEvent.getIndexBatchId();
@@ -525,7 +526,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
 
 
         //now execute it
-        indexProducer.put(indexOperationMessage).toBlocking().last();
+        return indexOperationMessage;
 
     }
 
@@ -568,6 +569,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
     @Override
     public void queueEntityDelete(final ApplicationScope applicationScope, final Id entityId) {
 
+        // sent in region (not offerTopic) as the delete IO happens in-region, then queues a multi-region de-index op
         offer( new EntityDeleteEvent(queueFig.getPrimaryRegion(), new EntityIdScope( applicationScope, entityId ) ) );
     }
 
@@ -699,7 +701,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
 
                                                  try {
                                                      List<IndexEventResult> indexEventResults = callEventHandlers( messages );
-                                                     List<QueueMessage> messagesToAck = ackMessages( indexEventResults );
+                                                     List<QueueMessage> messagesToAck = submitToIndex( indexEventResults );
 
                                                      if ( messagesToAck == null || messagesToAck.size() == 0 ) {
                                                          logger.error(
@@ -738,17 +740,21 @@ public class AsyncEventServiceImpl implements AsyncEventService {
      * @param indexEventResults
      * @return
      */
-    private List<QueueMessage> ackMessages(List<IndexEventResult> indexEventResults) {
+    private List<QueueMessage> submitToIndex(List<IndexEventResult> indexEventResults) {
         //if nothing came back then return null
         if(indexEventResults==null){
             return null;
         }
+        IndexOperationMessage combined = new IndexOperationMessage();
 
         // stream the messages to record the cycle time
-        return indexEventResults.stream()
+        List<QueueMessage> queueMessages = indexEventResults.stream()
             .map(indexEventResult -> {
                 //record the cycle time
                 messageCycle.update(System.currentTimeMillis() - indexEventResult.getCreationTime());
+                if(indexEventResult.getIndexOperationMessage().isPresent()){
+                    combined.ingest(indexEventResult.getIndexOperationMessage().get());
+                }
                 return indexEventResult;
             })
             // filter out messages that are not present, they were not processed and put into the results
@@ -756,34 +762,58 @@ public class AsyncEventServiceImpl implements AsyncEventService {
             .map(result -> result.getQueueMessage().get())
             // collect
             .collect(Collectors.toList());
+
+        // sumbit the requests to Elasticsearch
+        indexProducer.put(combined).toBlocking().last();
+
+        return queueMessages;
     }
 
     public void index(final ApplicationScope applicationScope, final Id id, final long updatedSince) {
-        //change to id scope to avoid serialization issues
-        offer( new EntityIndexEvent(queueFig.getPrimaryRegion(), new EntityIdScope( applicationScope, id ), updatedSince ) );
+
+        EntityIndexOperation entityIndexOperation =
+            new EntityIndexOperation( applicationScope, id, updatedSince);
+
+        queueIndexOperationMessage(eventBuilder.buildEntityIndex( entityIndexOperation ).toBlocking().lastOrDefault(null));
     }
 
     public void indexBatch(final List<EdgeScope> edges, final long updatedSince) {
 
-        List batch = new ArrayList<EdgeScope>();
+        IndexOperationMessage batch = new IndexOperationMessage();
+
         for ( EdgeScope e : edges){
-            //change to id scope to avoid serialization issues
-            batch.add(new EntityIndexEvent(queueFig.getPrimaryRegion(), new EntityIdScope(e.getApplicationScope(), e.getEdge().getTargetNode()), updatedSince));
+
+            EntityIndexOperation entityIndexOperation =
+                new EntityIndexOperation( e.getApplicationScope(), e.getEdge().getTargetNode(), updatedSince);
+
+            IndexOperationMessage indexOperationMessage =
+                eventBuilder.buildEntityIndex( entityIndexOperation ).toBlocking().lastOrDefault(null);
+
+            if (indexOperationMessage != null){
+                batch.ingest(indexOperationMessage);
+            }
+
         }
-        offerBatch( batch );
+
+        queueIndexOperationMessage(batch);
     }
 
 
     public class IndexEventResult{
+        private final Optional<IndexOperationMessage> indexOperationMessage;
         private final Optional<QueueMessage> queueMessage;
         private final long creationTime;
 
-        public IndexEventResult(Optional<QueueMessage> queueMessage, long creationTime){
+        public IndexEventResult(Optional<IndexOperationMessage> indexOperationMessage, Optional<QueueMessage> queueMessage, long creationTime){
 
             this.queueMessage = queueMessage;
             this.creationTime = creationTime;
+            this.indexOperationMessage = indexOperationMessage;
         }
 
+        public Optional<IndexOperationMessage> getIndexOperationMessage() {
+            return indexOperationMessage;
+        }
 
         public Optional<QueueMessage> getQueueMessage() {
             return queueMessage;

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3c399e79/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
index 5f86410..57b5812 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
@@ -40,9 +40,7 @@ import org.apache.usergrid.persistence.queue.QueueFig;
 @JsonTypeInfo( use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.WRAPPER_OBJECT, property = "type" )
 @JsonSubTypes( {
     @JsonSubTypes.Type( value = EdgeDeleteEvent.class, name = "edgeDeleteEvent" ),
-    @JsonSubTypes.Type( value = EdgeIndexEvent.class, name = "edgeIndexEvent" ),
     @JsonSubTypes.Type( value = EntityDeleteEvent.class, name = "entityDeleteEvent" ),
-    @JsonSubTypes.Type( value = EntityIndexEvent.class, name = "entityIndexEvent" ),
     @JsonSubTypes.Type( value = InitializeApplicationIndexEvent.class, name = "initializeApplicationIndexEvent" ),
     @JsonSubTypes.Type( value = ElasticsearchIndexEvent.class, name = "elasticsearchIndexEvent" )
 } )

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3c399e79/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EdgeIndexEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EdgeIndexEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EdgeIndexEvent.java
deleted file mode 100644
index 6164dce..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EdgeIndexEvent.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.asyncevents.model;
-
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.graph.Edge;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-
-public final class EdgeIndexEvent
-    extends AsyncEvent {
-
-
-    @JsonProperty
-    protected ApplicationScope applicationScope;
-
-    @JsonProperty
-    protected Id entityId;
-
-    @JsonProperty
-    protected Edge edge;
-
-    /**
-     * Needed by jackson
-     */
-    public EdgeIndexEvent() {
-        super();
-    }
-
-    public EdgeIndexEvent(String sourceRegion, ApplicationScope applicationScope, Id entityId, Edge edge) {
-        super(sourceRegion);
-        this.applicationScope = applicationScope;
-        this.entityId = entityId;
-        this.edge = edge;
-    }
-
-
-    public ApplicationScope getApplicationScope() {
-        return applicationScope;
-    }
-
-
-    public Edge getEdge() {
-        return edge;
-    }
-
-
-    public Id getEntityId() {
-        return entityId;
-    }
-}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3c399e79/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityIndexEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityIndexEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityIndexEvent.java
deleted file mode 100644
index 7e8184b..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityIndexEvent.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.asyncevents.model;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
-
-
-public final class EntityIndexEvent extends AsyncEvent {
-
-
-    @JsonProperty
-    protected EntityIdScope entityIdScope;
-
-    @JsonProperty
-    private long updatedAfter;
-
-    public EntityIndexEvent() {
-        super();
-    }
-
-    public EntityIndexEvent(String sourceRegion, EntityIdScope entityIdScope, final long updatedAfter ) {
-        super(sourceRegion);
-        this.entityIdScope = entityIdScope;
-        this.updatedAfter = updatedAfter;
-    }
-
-
-    public long getUpdatedAfter() {
-        return updatedAfter;
-    }
-
-
-    public EntityIdScope getEntityIdScope() {
-        return entityIdScope;
-    }
-}


[09/10] usergrid git commit: Merge commit 'refs/pull/488/head' of github.com:apache/usergrid into release-2.1.1

Posted by mr...@apache.org.
Merge commit 'refs/pull/488/head' of github.com:apache/usergrid into release-2.1.1


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/583c1049
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/583c1049
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/583c1049

Branch: refs/heads/release-2.1.1
Commit: 583c10492ef02613ea5e6ce0fefe0bba9b53189e
Parents: 2e29636 68b079a
Author: Michael Russo <mr...@apigee.com>
Authored: Thu Feb 25 11:55:35 2016 -0800
Committer: Michael Russo <mr...@apigee.com>
Committed: Thu Feb 25 11:55:35 2016 -0800

----------------------------------------------------------------------
 .../corepersistence/CpEntityManager.java        | 27 +++++++++-
 .../applications/ApplicationResource.java       | 27 ++++++----
 .../rest/applications/ApplicationDeleteIT.java  | 12 ++---
 .../collection/CollectionsResourceIT.java       | 56 ++++++++++++++++++++
 4 files changed, 105 insertions(+), 17 deletions(-)
----------------------------------------------------------------------



[08/10] usergrid git commit: Add additional test for validating admin user access.

Posted by mr...@apache.org.
Add additional test for validating admin user access.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/2e296361
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/2e296361
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/2e296361

Branch: refs/heads/release-2.1.1
Commit: 2e296361ab1b0afb8e762a3b7d16297944652f87
Parents: dfcec88
Author: Michael Russo <mr...@apigee.com>
Authored: Mon Feb 22 18:27:35 2016 -0800
Committer: Michael Russo <mr...@apigee.com>
Committed: Thu Feb 25 11:54:28 2016 -0800

----------------------------------------------------------------------
 .../usergrid/rest/management/AdminUsersIT.java  | 35 ++++++++++++++++++++
 1 file changed, 35 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/2e296361/stack/rest/src/test/java/org/apache/usergrid/rest/management/AdminUsersIT.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/management/AdminUsersIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/management/AdminUsersIT.java
index 423af29..e294556 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/management/AdminUsersIT.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/management/AdminUsersIT.java
@@ -408,6 +408,41 @@ public class AdminUsersIT extends AbstractRestIT {
 
     }
 
+    @Test
+    public void updateManagementUserWrongAdminToken() throws Exception {
+
+        Organization newOrg = createOrgPayload( "updateManagementUserWrongAdminToken", null );
+        Organization orgReturned = clientSetup.getRestClient().management().orgs().post( newOrg );
+        assertNotNull( orgReturned.getOwner() );
+
+        // add a new management user to the org for the purpose of a 'wrong' user trying update others
+        Entity adminUserPayload = new Entity();
+        String wrongAdminUsername = "wrongAdminUser"+UUIDUtils.newTimeUUID();
+        adminUserPayload.put( "username", wrongAdminUsername );
+        adminUserPayload.put( "name", wrongAdminUsername );
+        adminUserPayload.put( "email", wrongAdminUsername+"@usergrid.com" );
+        adminUserPayload.put( "password", wrongAdminUsername );
+        management().orgs().org( clientSetup.getOrganizationName() ).users().post(User.class ,adminUserPayload );
+
+
+        // get token of the newly added wrongAdminUser
+        Token wrongAdminToken = management.token().get(wrongAdminUsername, wrongAdminUsername);
+        assertNotNull(wrongAdminToken);
+        management.token().setToken( wrongAdminToken );
+
+        try{
+            //Add a property to management user
+            Entity userProperty = new Entity(  ).chainPut( "company","usergrid" );
+            management().users().user( newOrg.getUsername() ).put( userProperty );
+
+        } catch( UniformInterfaceException e ){
+
+            int status = e.getResponse().getStatus();
+            assertEquals(401, status);
+        }
+
+    }
+
 
 
 


[06/10] usergrid git commit: Add additional comments.

Posted by mr...@apache.org.
Add additional comments.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/5dfbcca2
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/5dfbcca2
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/5dfbcca2

Branch: refs/heads/release-2.1.1
Commit: 5dfbcca2a60ae1463fb6f6e5a9a05225da4c2d85
Parents: 3c399e7
Author: Michael Russo <mi...@gmail.com>
Authored: Fri Feb 19 23:10:40 2016 -0800
Committer: Michael Russo <mi...@gmail.com>
Committed: Fri Feb 19 23:10:40 2016 -0800

----------------------------------------------------------------------
 .../corepersistence/asyncevents/AsyncEventServiceImpl.java      | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/5dfbcca2/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
index dac3651..7a71410 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
@@ -340,7 +340,8 @@ public class AsyncEventServiceImpl implements AsyncEventService {
                 }
 
 
-                //return type that can be indexed and ack'd later
+                // returning indexOperationMessage will send that indexOperationMessage to the index producer
+                // if no exception happens and the QueueMessage is returned in these results, it will get ack'd
                 return new IndexEventResult(Optional.fromNullable(indexOperationMessage), Optional.of(message), thisEvent.getCreationTime());
 
             } catch (IndexDocNotFoundException e){
@@ -352,7 +353,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
 
             } catch (Exception e) {
 
-                // if the event fails to process, log the message and return empty event result so it doesn't get ack'd
+                // if the event fails to process, log and return empty message result so it doesn't get ack'd
                 logger.error("Failed to process message: {} {}", message.getMessageId(), message.getStringBody(), e);
                 return new IndexEventResult(Optional.absent(), Optional.absent(), event.getCreationTime());
             }


[07/10] usergrid git commit: Allow superuser to access @RequireAdminUserAccess

Posted by mr...@apache.org.
Allow superuser to access @RequireAdminUserAccess

Conflicts:
	stack/rest/src/main/java/org/apache/usergrid/rest/security/SecuredResourceFilterFactory.java
	stack/rest/src/test/java/org/apache/usergrid/rest/management/AdminUsersIT.java


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/dfcec88d
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/dfcec88d
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/dfcec88d

Branch: refs/heads/release-2.1.1
Commit: dfcec88d41079f51bd393581b6c5419d9995b0e4
Parents: 3ea2b3c
Author: Michael Russo <mr...@apigee.com>
Authored: Mon Feb 22 18:02:58 2016 -0800
Committer: Michael Russo <mr...@apigee.com>
Committed: Thu Feb 25 11:54:08 2016 -0800

----------------------------------------------------------------------
 .../rest/management/users/UserResource.java     |  2 +-
 .../security/SecuredResourceFilterFactory.java  |  2 +-
 .../usergrid/rest/management/AdminUsersIT.java  | 68 ++++++++++++++------
 3 files changed, 52 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/dfcec88d/stack/rest/src/main/java/org/apache/usergrid/rest/management/users/UserResource.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/management/users/UserResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/management/users/UserResource.java
index ed39c31..7ca6418 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/management/users/UserResource.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/management/users/UserResource.java
@@ -89,7 +89,7 @@ public class UserResource extends AbstractContextResource {
         return getSubResource( OrganizationsResource.class ).init( user );
     }
 
-
+    @RequireAdminUserAccess
     @PUT
     @JSONP
     @Produces({MediaType.APPLICATION_JSON, "application/javascript"})

http://git-wip-us.apache.org/repos/asf/usergrid/blob/dfcec88d/stack/rest/src/main/java/org/apache/usergrid/rest/security/SecuredResourceFilterFactory.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/security/SecuredResourceFilterFactory.java b/stack/rest/src/main/java/org/apache/usergrid/rest/security/SecuredResourceFilterFactory.java
index 531d355..0514dca 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/security/SecuredResourceFilterFactory.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/security/SecuredResourceFilterFactory.java
@@ -368,7 +368,7 @@ public class SecuredResourceFilterFactory implements DynamicFeature {
                 if (logger.isTraceEnabled()) {
                     logger.trace("AdminUserFilter.authorize");
                 }
-                if (!isUser( getUserIdentifier() )) {
+                if (!isUser( getUserIdentifier() ) && !isServiceAdmin() ) {
                     throw mappableSecurityException( "unauthorized", "No admin user access authorized" );
                 }
             }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/dfcec88d/stack/rest/src/test/java/org/apache/usergrid/rest/management/AdminUsersIT.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/management/AdminUsersIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/management/AdminUsersIT.java
index cf27644..423af29 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/management/AdminUsersIT.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/management/AdminUsersIT.java
@@ -17,6 +17,7 @@
 
 package org.apache.usergrid.rest.management;
 
+import com.sun.jersey.api.client.UniformInterfaceException;
 import net.jcip.annotations.NotThreadSafe;
 import org.apache.usergrid.management.MockImapClient;
 import org.apache.usergrid.persistence.core.util.StringUtils;
@@ -25,7 +26,6 @@ import org.apache.usergrid.rest.test.resource.AbstractRestIT;
 import org.apache.usergrid.rest.test.resource.endpoints.mgmt.ManagementResource;
 import org.apache.usergrid.rest.test.resource.model.*;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.jvnet.mock_javamail.Mailbox;
 
@@ -340,15 +340,11 @@ public class AdminUsersIT extends AbstractRestIT {
         }
     }
 
-    /**
-     * Update the current management user and make sure the change persists
-     * @throws Exception
-     */
-    @Ignore("Pending new feature https://issues.apache.org/jira/browse/USERGRID-1127")
     @Test
-    public void updateManagementUser() throws Exception {
+    public void updateManagementUserNoToken() throws Exception {
 
-        Organization newOrg = createOrgPayload( "updateManagementUser", null );
+
+        Organization newOrg = createOrgPayload( "updateManagementUserNoToken", null );
 
 
         Organization orgReturned = clientSetup.getRestClient().management().orgs().post( newOrg );
@@ -357,28 +353,64 @@ public class AdminUsersIT extends AbstractRestIT {
 
         //Add a property to management user
         Entity userProperty = new Entity(  ).chainPut( "company","usergrid" );
-        management().users().user( newOrg.getUsername() ).put( userProperty );
 
-        Entity userUpdated = updateAdminUser( userProperty, orgReturned );
+        try{
+            management().users().user( newOrg.getUsername() ).put( userProperty );
+        } catch( UniformInterfaceException e ){
 
-        assertEquals( "usergrid",userUpdated.getAsString( "company" ) );
+            int status = e.getResponse().getStatus();
+            assertEquals(401, status);
+        }
+
+    }
+
+    @Test
+    public void updateManagementUserSuperuserToken() throws Exception {
+
+
+        Organization newOrg = createOrgPayload( "updateManagementUserSuperuserToken", null );
+
+
+        Organization orgReturned = clientSetup.getRestClient().management().orgs().post( newOrg );
+
+        assertNotNull( orgReturned.getOwner() );
+
+        //Add a property to management user
+        Entity userProperty = new Entity(  ).chainPut( "company","usergrid" );
 
-        //Update property with new management value.
-        userProperty = new Entity(  ).chainPut( "company","Apigee" );
+        management.token().setToken( clientSetup.getSuperuserToken());
+        management().users().user( newOrg.getUsername() ).put( userProperty );
 
-        userUpdated = updateAdminUser( userProperty, orgReturned);
 
-        assertEquals( "Apigee",userUpdated.getAsString( "company" ) );
     }
 
-    private Entity updateAdminUser(Entity userProperty, Organization organization){
-        management().users().user( organization.getUsername() ).put( userProperty );
+    @Test
+    public void updateManagementUserAdminToken() throws Exception {
+
+        Organization newOrg = createOrgPayload( "updateManagementUserAdminToken", null );
+
+
+        Organization orgReturned = clientSetup.getRestClient().management().orgs().post( newOrg );
+
+        assertNotNull( orgReturned.getOwner() );
 
-        return management().users().user( organization.getUsername() ).get();
+        String orgName = orgReturned.getName();
+
+        //Add a property to management user
+        Entity userProperty = new Entity(  ).chainPut( "company","usergrid" );
+
+        User adminUser = orgReturned.getOwner();
+
+        Token adminToken = management.token().get(adminUser.getUsername(), orgName);
+        assertNotNull(adminToken);
+        management.token().setToken( adminToken );
+        management().users().user( newOrg.getUsername() ).put( userProperty );
 
     }
 
 
+
+
     /**
      * Check that we send the reactivate email to the user after calling the reactivate endpoint.
      * @throws Exception


[04/10] usergrid git commit: Reduce SQS hop for entity write/update indexing events.

Posted by mr...@apache.org.
Reduce SQS hop for entity write/update indexing events.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/b4634dc4
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/b4634dc4
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/b4634dc4

Branch: refs/heads/release-2.1.1
Commit: b4634dc42f767a982892362bbd4aa66059bf1998
Parents: d4c7a3c
Author: Michael Russo <mi...@gmail.com>
Authored: Fri Feb 19 13:35:48 2016 -0800
Committer: Michael Russo <mi...@gmail.com>
Committed: Fri Feb 19 13:35:48 2016 -0800

----------------------------------------------------------------------
 .../corepersistence/CpEntityManager.java        |   4 +-
 .../asyncevents/AmazonAsyncEventService.java    | 844 -------------------
 .../asyncevents/AsyncEventService.java          |   5 +-
 .../asyncevents/AsyncEventServiceImpl.java      | 798 ++++++++++++++++++
 .../asyncevents/AsyncIndexProvider.java         |   4 +-
 .../asyncevents/EventBuilder.java               |  12 +-
 .../asyncevents/EventBuilderImpl.java           |  15 +-
 .../asyncevents/IndexDocNotFoundException.java  |  37 +
 .../corepersistence/index/IndexServiceImpl.java |   4 +-
 .../read/search/CandidateEntityFilter.java      |  10 +-
 .../index/AmazonAsyncEventServiceTest.java      | 103 ---
 .../index/AsyncEventServiceImplTest.java        | 103 +++
 .../index/AsyncIndexServiceTest.java            |   3 +-
 .../index/impl/EsIndexProducerImpl.java         |   5 +-
 .../usergrid/persistence/queue/QueueFig.java    |   4 +
 .../usergrid/services/ServiceManager.java       |   5 +-
 16 files changed, 972 insertions(+), 984 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/b4634dc4/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index 6c2ef0b..b677f79 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -590,7 +590,7 @@ public class CpEntityManager implements EntityManager {
 
         // update in all containing collections and connection indexes
 
-        indexService.queueEntityIndexUpdate( applicationScope, cpEntity );
+        indexService.queueEntityIndexUpdate( applicationScope, cpEntity, 0);
     }
 
 
@@ -1107,7 +1107,7 @@ public class CpEntityManager implements EntityManager {
 
         //Adding graphite metrics
 
-        indexService.queueEntityIndexUpdate(applicationScope, cpEntity);
+        indexService.queueEntityIndexUpdate(applicationScope, cpEntity, 0);
     }
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/b4634dc4/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
deleted file mode 100644
index 00dc69a..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ /dev/null
@@ -1,844 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.asyncevents;
-
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.*;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-import org.apache.usergrid.persistence.index.impl.*;
-import org.elasticsearch.action.index.IndexRequest;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.corepersistence.asyncevents.model.AsyncEvent;
-import org.apache.usergrid.corepersistence.asyncevents.model.EdgeDeleteEvent;
-import org.apache.usergrid.corepersistence.asyncevents.model.EdgeIndexEvent;
-import org.apache.usergrid.corepersistence.asyncevents.model.ElasticsearchIndexEvent;
-import org.apache.usergrid.corepersistence.asyncevents.model.EntityDeleteEvent;
-import org.apache.usergrid.corepersistence.asyncevents.model.EntityIndexEvent;
-import org.apache.usergrid.corepersistence.asyncevents.model.InitializeApplicationIndexEvent;
-import org.apache.usergrid.corepersistence.index.EntityIndexOperation;
-import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory;
-import org.apache.usergrid.corepersistence.index.IndexProcessorFig;
-import org.apache.usergrid.corepersistence.index.ReplicatedIndexLocationStrategy;
-import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
-import org.apache.usergrid.corepersistence.util.CpNamingUtils;
-import org.apache.usergrid.corepersistence.util.ObjectJsonSerializer;
-import org.apache.usergrid.persistence.collection.EntityCollectionManager;
-import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
-import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
-import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
-import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.graph.Edge;
-import org.apache.usergrid.persistence.index.EntityIndex;
-import org.apache.usergrid.persistence.index.EntityIndexFactory;
-import org.apache.usergrid.persistence.index.IndexLocationStrategy;
-import org.apache.usergrid.persistence.map.MapManager;
-import org.apache.usergrid.persistence.map.MapManagerFactory;
-import org.apache.usergrid.persistence.map.MapScope;
-import org.apache.usergrid.persistence.map.impl.MapScopeImpl;
-import org.apache.usergrid.persistence.model.entity.Entity;
-import org.apache.usergrid.persistence.model.entity.Id;
-import org.apache.usergrid.persistence.model.util.UUIDGenerator;
-import org.apache.usergrid.persistence.queue.QueueFig;
-import org.apache.usergrid.persistence.queue.QueueManager;
-import org.apache.usergrid.persistence.queue.QueueManagerFactory;
-import org.apache.usergrid.persistence.queue.QueueMessage;
-import org.apache.usergrid.persistence.queue.QueueScope;
-import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl;
-
-import com.codahale.metrics.Counter;
-import com.codahale.metrics.Gauge;
-import com.codahale.metrics.Histogram;
-import com.codahale.metrics.Timer;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-
-import rx.Observable;
-import rx.Subscriber;
-import rx.Subscription;
-import rx.schedulers.Schedulers;
-
-
-/**
- * TODO, this whole class is becoming a nightmare.  We need to remove all consume from this class and refactor it into the following manner.
- *
- * 1. Produce.  Keep the code in the handle as is
- * 2. Consume:  Move the code into a refactored system
- * 2.1 A central dispatcher
- * 2.2 An interface that produces an observable of type BatchOperation.  Any handler will be refactored into it's own
- *      impl that will then emit a stream of batch operations to perform
- * 2.3 The central dispatcher will then subscribe to these events and merge them.  Handing them off to a batch handler
- * 2.4 The batch handler will roll up the operations into a batch size, and then queue them
- * 2.5 The receive batch handler will execute the batch operations
- *
- * TODO determine how we error handle?
- *
- */
-@Singleton
-public class AmazonAsyncEventService implements AsyncEventService {
-
-
-    private static final Logger logger = LoggerFactory.getLogger(AmazonAsyncEventService.class);
-
-    // SQS maximum receive messages is 10
-    public int MAX_TAKE = 10;
-    public static final String QUEUE_NAME = "index"; //keep this short as AWS limits queue name size to 80 chars
-
-    private final QueueManager queue;
-    private final IndexProcessorFig indexProcessorFig;
-    private final QueueFig queueFig;
-    private final IndexProducer indexProducer;
-    private final EntityCollectionManagerFactory entityCollectionManagerFactory;
-    private final IndexLocationStrategyFactory indexLocationStrategyFactory;
-    private final EntityIndexFactory entityIndexFactory;
-    private final EventBuilder eventBuilder;
-    private final RxTaskScheduler rxTaskScheduler;
-
-    private final Timer readTimer;
-    private final Timer writeTimer;
-    private final Timer ackTimer;
-
-    /**
-     * This mutex is used to start/stop workers to ensure we're not concurrently modifying our subscriptions
-     */
-    private final Object mutex = new Object();
-
-    private final Counter indexErrorCounter;
-    private final AtomicLong counter = new AtomicLong();
-    private final AtomicLong inFlight = new AtomicLong();
-    private final Histogram messageCycle;
-    private final MapManager esMapPersistence;
-
-    //the actively running subscription
-    private List<Subscription> subscriptions = new ArrayList<>();
-
-
-    @Inject
-    public AmazonAsyncEventService( final QueueManagerFactory queueManagerFactory,
-                                    final IndexProcessorFig indexProcessorFig,
-                                    final IndexProducer indexProducer,
-                                    final MetricsFactory metricsFactory,
-                                    final EntityCollectionManagerFactory entityCollectionManagerFactory,
-                                    final IndexLocationStrategyFactory indexLocationStrategyFactory,
-                                    final EntityIndexFactory entityIndexFactory,
-                                    final EventBuilder eventBuilder,
-                                    final MapManagerFactory mapManagerFactory,
-                                    final QueueFig queueFig,
-                                    @EventExecutionScheduler
-                                    final RxTaskScheduler rxTaskScheduler ) {
-        this.indexProducer = indexProducer;
-
-        this.entityCollectionManagerFactory = entityCollectionManagerFactory;
-        this.indexLocationStrategyFactory = indexLocationStrategyFactory;
-        this.entityIndexFactory = entityIndexFactory;
-        this.eventBuilder = eventBuilder;
-
-        final MapScope mapScope = new MapScopeImpl( CpNamingUtils.getManagementApplicationId(),  "indexEvents");
-
-        this.esMapPersistence = mapManagerFactory.createMapManager( mapScope );
-
-        this.rxTaskScheduler = rxTaskScheduler;
-
-        QueueScope queueScope = new QueueScopeImpl(QUEUE_NAME, QueueScope.RegionImplementation.ALL);
-        this.queue = queueManagerFactory.getQueueManager(queueScope);
-
-        this.indexProcessorFig = indexProcessorFig;
-        this.queueFig = queueFig;
-
-        this.writeTimer = metricsFactory.getTimer(AmazonAsyncEventService.class, "async_event.write");
-        this.readTimer = metricsFactory.getTimer(AmazonAsyncEventService.class, "async_event.read");
-        this.ackTimer = metricsFactory.getTimer(AmazonAsyncEventService.class, "async_event.ack");
-        this.indexErrorCounter = metricsFactory.getCounter(AmazonAsyncEventService.class, "async_event.error");
-        this.messageCycle = metricsFactory.getHistogram(AmazonAsyncEventService.class, "async_event.message_cycle");
-
-
-        //wire up the gauge of inflight message
-        metricsFactory.addGauge(AmazonAsyncEventService.class, "async-event.inflight", new Gauge<Long>() {
-            @Override
-            public Long getValue() {
-                return inFlight.longValue();
-            }
-        });
-
-        start();
-    }
-
-
-    /**
-     * Offer the EntityIdScope to SQS
-     */
-    private void offer(final Serializable operation) {
-        final Timer.Context timer = this.writeTimer.time();
-
-        try {
-            //signal to SQS
-            this.queue.sendMessage( operation );
-        } catch (IOException e) {
-            throw new RuntimeException("Unable to queue message", e);
-        } finally {
-            timer.stop();
-        }
-    }
-
-
-    private void offerTopic( final Serializable operation ) {
-        final Timer.Context timer = this.writeTimer.time();
-
-        try {
-            //signal to SQS
-            this.queue.sendMessageToTopic( operation );
-        }
-        catch ( IOException e ) {
-            throw new RuntimeException( "Unable to queue message", e );
-        }
-        finally {
-            timer.stop();
-        }
-    }
-
-    private void offerBatch(final List operations){
-        final Timer.Context timer = this.writeTimer.time();
-
-        try {
-            //signal to SQS
-            this.queue.sendMessages(operations);
-        } catch (IOException e) {
-            throw new RuntimeException("Unable to queue message", e);
-        } finally {
-            timer.stop();
-        }
-    }
-
-
-    /**
-     * Take message from SQS
-     */
-    private List<QueueMessage> take() {
-
-        final Timer.Context timer = this.readTimer.time();
-
-        try {
-            return queue.getMessages(MAX_TAKE,
-                    indexProcessorFig.getIndexQueueVisibilityTimeout(),
-                    indexProcessorFig.getIndexQueueTimeout(),
-                    AsyncEvent.class);
-        }
-        //stop our timer
-        finally {
-            timer.stop();
-        }
-    }
-
-
-
-    /**
-     * Ack message in SQS
-     */
-    public void ack(final List<QueueMessage> messages) {
-
-        final Timer.Context timer = this.ackTimer.time();
-
-        try{
-            queue.commitMessages( messages );
-
-            //decrement our in-flight counter
-            inFlight.decrementAndGet();
-
-        }catch(Exception e){
-            throw new RuntimeException("Unable to ack messages", e);
-        }finally {
-            timer.stop();
-        }
-
-
-    }
-
-    /**
-     * calls the event handlers and returns a result with information on whether it needs to be ack'd and whether it needs to be indexed
-     * @param messages
-     * @return
-     */
-    private List<IndexEventResult> callEventHandlers(final List<QueueMessage> messages) {
-
-        if (logger.isDebugEnabled()) {
-            logger.debug("callEventHandlers with {} message", messages.size());
-        }
-
-        Stream<IndexEventResult> indexEventResults = messages.stream().map(message -> {
-            AsyncEvent event = null;
-            try {
-                event = (AsyncEvent) message.getBody();
-            } catch (ClassCastException cce) {
-                logger.error("Failed to deserialize message body", cce);
-            }
-
-            if (event == null) {
-                logger.error("AsyncEvent type or event is null!");
-                return new IndexEventResult(Optional.fromNullable(message), Optional.<IndexOperationMessage>absent(),
-                    System.currentTimeMillis());
-            }
-
-            final AsyncEvent thisEvent = event;
-
-            if (logger.isDebugEnabled()) {
-                logger.debug("Processing {} event", event);
-            }
-
-            try {
-                //check for empty sets if this is true
-                boolean validateEmptySets = true;
-                Observable<IndexOperationMessage> indexoperationObservable;
-                //merge each operation to a master observable;
-                if ( event instanceof EdgeDeleteEvent ) {
-                    indexoperationObservable = handleEdgeDelete( message );
-                }
-                else if ( event instanceof EdgeIndexEvent ) {
-                    indexoperationObservable = handleEdgeIndex( message );
-                }
-                else if ( event instanceof EntityDeleteEvent ) {
-                    indexoperationObservable = handleEntityDelete( message );
-                    validateEmptySets = false; // do not check this one for an empty set b/c it can be empty
-
-                }
-                else if ( event instanceof EntityIndexEvent ) {
-                    indexoperationObservable = handleEntityIndexUpdate( message );
-                }
-                else if ( event instanceof InitializeApplicationIndexEvent ) {
-                    //does not return observable
-                    handleInitializeApplicationIndex(event, message);
-                    indexoperationObservable = Observable.just(new IndexOperationMessage());
-                    validateEmptySets = false; //do not check this one for an empty set b/c it will be empty.
-                } else if (event instanceof ElasticsearchIndexEvent) {
-                    handleIndexOperation((ElasticsearchIndexEvent) event);
-                    indexoperationObservable = Observable.just(new IndexOperationMessage());
-                    validateEmptySets = false; //do not check this one for an empty set b/c it will be empty.
-                } else {
-                    throw new Exception("Unknown EventType");//TODO: print json instead
-                }
-
-                //collect all of the
-                IndexOperationMessage indexOperationMessage = indexoperationObservable
-                    .collect(() -> new IndexOperationMessage(), (collector, single) -> collector.ingest(single))
-                    .toBlocking().lastOrDefault(null);
-
-                if (validateEmptySets && (indexOperationMessage == null || indexOperationMessage.isEmpty())) {
-                    logger.error("Received empty index sequence message:({}), body:({}) ", message.getMessageId(),
-                        message.getStringBody());
-                    throw new Exception("Received empty index sequence.");
-                }
-
-                //return type that can be indexed and ack'd later
-                return new IndexEventResult(Optional.fromNullable(message),
-                    Optional.fromNullable(indexOperationMessage), thisEvent.getCreationTime());
-            } catch (Exception e) {
-                logger.error("Failed to index message: {} {}", message.getMessageId(), message.getStringBody(), e);
-                return new IndexEventResult(Optional.absent(), Optional.<IndexOperationMessage>absent(),
-                    event.getCreationTime());
-            }
-        });
-
-
-        return indexEventResults.collect(Collectors.toList());
-    }
-
-    @Override
-    public void queueInitializeApplicationIndex( final ApplicationScope applicationScope) {
-        IndexLocationStrategy indexLocationStrategy = indexLocationStrategyFactory.getIndexLocationStrategy(
-            applicationScope);
-        offerTopic( new InitializeApplicationIndexEvent( queueFig.getPrimaryRegion(),
-            new ReplicatedIndexLocationStrategy( indexLocationStrategy ) ) );
-    }
-
-
-    @Override
-    public void queueEntityIndexUpdate(final ApplicationScope applicationScope,
-                                       final Entity entity) {
-
-        offer(new EntityIndexEvent(queueFig.getPrimaryRegion(),new EntityIdScope(applicationScope, entity.getId()), 0));
-    }
-
-
-    public Observable<IndexOperationMessage> handleEntityIndexUpdate(final QueueMessage message) {
-
-        Preconditions.checkNotNull( message, "Queue Message cannot be null for handleEntityIndexUpdate" );
-
-        final AsyncEvent event = ( AsyncEvent ) message.getBody();
-
-        Preconditions.checkNotNull(message, "QueueMessage Body cannot be null for handleEntityIndexUpdate");
-        Preconditions.checkArgument(event instanceof EntityIndexEvent, String.format("Event Type for handleEntityIndexUpdate must be ENTITY_INDEX, got %s", event.getClass()));
-
-        final EntityIndexEvent entityIndexEvent = (EntityIndexEvent) event;
-
-
-        //process the entity immediately
-        //only process the same version, otherwise ignore
-        final EntityIdScope entityIdScope = entityIndexEvent.getEntityIdScope();
-        final ApplicationScope applicationScope = entityIdScope.getApplicationScope();
-        final Id entityId = entityIdScope.getId();
-        final long updatedAfter = entityIndexEvent.getUpdatedAfter();
-
-        final EntityIndexOperation entityIndexOperation = new EntityIndexOperation( applicationScope, entityId, updatedAfter);
-
-        final Observable<IndexOperationMessage> observable = eventBuilder.buildEntityIndex( entityIndexOperation );
-        return observable;
-    }
-
-
-    @Override
-    public void queueNewEdge(final ApplicationScope applicationScope,
-                             final Entity entity,
-                             final Edge newEdge) {
-
-        EdgeIndexEvent operation = new EdgeIndexEvent(queueFig.getPrimaryRegion(), applicationScope, entity.getId(), newEdge);
-
-        offer( operation );
-    }
-
-    public Observable<IndexOperationMessage> handleEdgeIndex(final QueueMessage message) {
-
-        Preconditions.checkNotNull( message, "Queue Message cannot be null for handleEdgeIndex" );
-
-        final AsyncEvent event = (AsyncEvent) message.getBody();
-
-        Preconditions.checkNotNull( message, "QueueMessage Body cannot be null for handleEdgeIndex" );
-        Preconditions.checkArgument(event instanceof EdgeIndexEvent, String.format("Event Type for handleEdgeIndex must be EDGE_INDEX, got %s", event.getClass()));
-
-        final EdgeIndexEvent edgeIndexEvent = ( EdgeIndexEvent ) event;
-
-        final ApplicationScope applicationScope = edgeIndexEvent.getApplicationScope();
-        final Edge edge = edgeIndexEvent.getEdge();
-
-
-
-        final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager( applicationScope );
-
-        final Observable<IndexOperationMessage> edgeIndexObservable = ecm.load( edgeIndexEvent.getEntityId() ).flatMap(
-            entity -> eventBuilder.buildNewEdge(applicationScope, entity, edge));
-        return edgeIndexObservable;
-    }
-
-    @Override
-    public void queueDeleteEdge(final ApplicationScope applicationScope,
-                                final Edge edge) {
-
-        offer( new EdgeDeleteEvent( queueFig.getPrimaryRegion(), applicationScope, edge ) );
-    }
-
-    public Observable<IndexOperationMessage> handleEdgeDelete(final QueueMessage message) {
-
-        Preconditions.checkNotNull( message, "Queue Message cannot be null for handleEdgeDelete" );
-
-        final AsyncEvent event = (AsyncEvent) message.getBody();
-
-        Preconditions.checkNotNull( message, "QueueMessage Body cannot be null for handleEdgeDelete" );
-        Preconditions.checkArgument(event instanceof EdgeDeleteEvent, String.format("Event Type for handleEdgeDelete must be EDGE_DELETE, got %s", event.getClass()));
-
-
-        final EdgeDeleteEvent edgeDeleteEvent = ( EdgeDeleteEvent ) event;
-
-        final ApplicationScope applicationScope = edgeDeleteEvent.getApplicationScope();
-        final Edge edge = edgeDeleteEvent.getEdge();
-
-        if (logger.isDebugEnabled()) {
-            logger.debug("Deleting in app scope {} with edge {}", applicationScope, edge);
-        }
-
-        return eventBuilder.buildDeleteEdge(applicationScope, edge);
-    }
-
-
-    @Override
-    public void queueEntityDelete(final ApplicationScope applicationScope, final Id entityId) {
-
-        offer( new EntityDeleteEvent(queueFig.getPrimaryRegion(), new EntityIdScope( applicationScope, entityId ) ) );
-    }
-
-
-    /**
-     * Queue up an indexOperationMessage for multi region execution
-     * @param indexOperationMessage
-     */
-    public void queueIndexOperationMessage( final IndexOperationMessage indexOperationMessage ) {
-
-        // don't try to produce something with nothing
-        if(indexOperationMessage.isEmpty()){
-            return;
-        }
-
-        final String jsonValue = ObjectJsonSerializer.INSTANCE.toString( indexOperationMessage );
-
-        final UUID newMessageId = UUIDGenerator.newTimeUUID();
-
-        final int expirationTimeInSeconds =
-            ( int ) TimeUnit.MILLISECONDS.toSeconds( indexProcessorFig.getIndexMessageTtl() );
-
-        //write to the map in ES
-        esMapPersistence.putString( newMessageId.toString(), jsonValue, expirationTimeInSeconds );
-
-
-
-        //now queue up the index message
-
-        final ElasticsearchIndexEvent elasticsearchIndexEvent =
-            new ElasticsearchIndexEvent(queueFig.getPrimaryRegion(), newMessageId );
-
-        //send to the topic so all regions index the batch
-
-        offerTopic( elasticsearchIndexEvent );
-    }
-
-    public void handleIndexOperation(final ElasticsearchIndexEvent elasticsearchIndexEvent){
-         Preconditions.checkNotNull( elasticsearchIndexEvent, "elasticsearchIndexEvent cannot be null" );
-
-        final UUID messageId = elasticsearchIndexEvent.getIndexBatchId();
-
-        Preconditions.checkNotNull( messageId, "messageId must not be null" );
-
-
-        //load the entity
-
-        final String message = esMapPersistence.getString( messageId.toString() );
-
-        final IndexOperationMessage indexOperationMessage;
-
-        if(message == null){
-            logger.warn( "Received message with id {} to process, unable to find it, reading with higher consistency level",
-                messageId);
-
-            final String highConsistency =  esMapPersistence.getStringHighConsistency( messageId.toString() );
-
-            if(highConsistency == null){
-                logger.error( "Unable to find the ES batch with id {} to process at a higher consistency level",
-                    messageId);
-
-                throw new RuntimeException( "Unable to find the ES batch to process with message id " + messageId );
-            }
-
-            indexOperationMessage = ObjectJsonSerializer.INSTANCE.fromString( highConsistency, IndexOperationMessage.class );
-
-        } else{
-            indexOperationMessage = ObjectJsonSerializer.INSTANCE.fromString( message, IndexOperationMessage.class );
-        }
-
-        initializeEntityIndexes(indexOperationMessage);
-
-        //NOTE that we intentionally do NOT delete from the map.  We can't know when all regions have consumed the message
-        //so we'll let compaction on column expiration handle deletion
-
-        //read the value from the string
-
-        Preconditions.checkNotNull( indexOperationMessage, "indexOperationMessage cannot be null" );
-        Preconditions.checkArgument( !indexOperationMessage.isEmpty() , "queued indexOperationMessage messages should not be empty" );
-
-
-        //now execute it
-        indexProducer.put(indexOperationMessage).toBlocking().last();
-
-    }
-
-    /**
-     *     this method will call initialize for each message, since we are caching the entity indexes,
-     *     we don't worry about aggregating by app id
-     * @param indexOperationMessage
-     */
-    private void initializeEntityIndexes(final IndexOperationMessage indexOperationMessage) {
-
-        // create a set so we can have a unique list of appIds for which we call createEntityIndex
-        Set<UUID> appIds = new HashSet<>();
-
-        // loop through all indexRequests and add the appIds to the set
-        indexOperationMessage.getIndexRequests().forEach(req -> {
-            UUID appId = IndexingUtils.getApplicationIdFromIndexDocId(req.documentId);
-            appIds.add(appId);
-        });
-
-        // loop through all deindexRequests and add the appIds to the set
-        indexOperationMessage.getDeIndexRequests().forEach(req -> {
-            UUID appId = IndexingUtils.getApplicationIdFromIndexDocId(req.documentId);
-            appIds.add(appId);
-        });
-
-        // for each of the appIds in the unique set, call create entity index to ensure the aliases are created
-        appIds.forEach(appId -> {
-                ApplicationScope appScope = CpNamingUtils.getApplicationScope(appId);
-                entityIndexFactory.createEntityIndex(indexLocationStrategyFactory.getIndexLocationStrategy(appScope));
-            }
-        );
-    }
-
-
-    @Override
-    public long getQueueDepth() {
-        return queue.getQueueDepth();
-    }
-
-    public Observable<IndexOperationMessage> handleEntityDelete(final QueueMessage message) {
-
-        Preconditions.checkNotNull(message, "Queue Message cannot be null for handleEntityDelete");
-
-        final AsyncEvent event = (AsyncEvent) message.getBody();
-        Preconditions.checkNotNull( message, "QueueMessage Body cannot be null for handleEntityDelete" );
-        Preconditions.checkArgument( event instanceof EntityDeleteEvent,
-            String.format( "Event Type for handleEntityDelete must be ENTITY_DELETE, got %s", event.getClass() ) );
-
-
-        final EntityDeleteEvent entityDeleteEvent = ( EntityDeleteEvent ) event;
-        final ApplicationScope applicationScope = entityDeleteEvent.getEntityIdScope().getApplicationScope();
-        final Id entityId = entityDeleteEvent.getEntityIdScope().getId();
-
-        if (logger.isDebugEnabled())
-            logger.debug("Deleting entity id from index in app scope {} with entityId {}", applicationScope, entityId);
-
-        final EventBuilderImpl.EntityDeleteResults
-            entityDeleteResults = eventBuilder.buildEntityDelete( applicationScope, entityId );
-
-
-        // Delete the entities and remove from graph separately
-        entityDeleteResults.getEntitiesDeleted().toBlocking().lastOrDefault(null);
-
-        entityDeleteResults.getCompactedNode().toBlocking().lastOrDefault(null);
-
-        return entityDeleteResults.getIndexObservable();
-    }
-
-
-    public void handleInitializeApplicationIndex(final AsyncEvent event, final QueueMessage message) {
-        Preconditions.checkNotNull(message, "Queue Message cannot be null for handleInitializeApplicationIndex");
-        Preconditions.checkArgument(event instanceof InitializeApplicationIndexEvent, String.format("Event Type for handleInitializeApplicationIndex must be APPLICATION_INDEX, got %s", event.getClass()));
-
-        final InitializeApplicationIndexEvent initializeApplicationIndexEvent =
-            ( InitializeApplicationIndexEvent ) event;
-
-        final IndexLocationStrategy indexLocationStrategy = initializeApplicationIndexEvent.getIndexLocationStrategy();
-        final EntityIndex index = entityIndexFactory.createEntityIndex( indexLocationStrategy );
-        index.initialize();
-    }
-
-    /**
-     * Loop through and start the workers
-     */
-    public void start() {
-        final int count = indexProcessorFig.getWorkerCount();
-
-        for (int i = 0; i < count; i++) {
-            startWorker();
-        }
-    }
-
-
-    /**
-     * Stop the workers
-     */
-    public void stop() {
-        synchronized (mutex) {
-            //stop consuming
-
-            for (final Subscription subscription : subscriptions) {
-                subscription.unsubscribe();
-            }
-        }
-    }
-
-
-    private void startWorker() {
-        synchronized (mutex) {
-
-            Observable<List<QueueMessage>> consumer =
-                    Observable.create( new Observable.OnSubscribe<List<QueueMessage>>() {
-                        @Override
-                        public void call( final Subscriber<? super List<QueueMessage>> subscriber ) {
-
-                            //name our thread so it's easy to see
-                            Thread.currentThread().setName( "QueueConsumer_" + counter.incrementAndGet() );
-
-                            List<QueueMessage> drainList = null;
-
-                            do {
-                                try {
-                                    drainList = take();
-                                    //emit our list in it's entity to hand off to a worker pool
-                                        subscriber.onNext(drainList);
-
-                                    //take since  we're in flight
-                                    inFlight.addAndGet( drainList.size() );
-                                }
-                                catch ( Throwable t ) {
-                                    final long sleepTime = indexProcessorFig.getFailureRetryTime();
-
-                                    logger.error( "Failed to dequeue.  Sleeping for {} milliseconds", sleepTime, t );
-
-                                    if ( drainList != null ) {
-                                        inFlight.addAndGet( -1 * drainList.size() );
-                                    }
-
-
-                                    try {
-                                        Thread.sleep( sleepTime );
-                                    }
-                                    catch ( InterruptedException ie ) {
-                                        //swallow
-                                    }
-
-                                    indexErrorCounter.inc();
-                                }
-                            }
-                            while ( true );
-                        }
-                    } )        //this won't block our read loop, just reads and proceeds
-                        .flatMap( sqsMessages -> {
-
-                            //do this on a different schedule, and introduce concurrency with flatmap for faster processing
-                            return Observable.just( sqsMessages )
-
-                                             .map( messages -> {
-                                                 if ( messages == null || messages.size() == 0 ) {
-                                                     return null;
-                                                 }
-
-                                                 try {
-                                                     List<IndexEventResult> indexEventResults =
-                                                         callEventHandlers( messages );
-                                                     List<QueueMessage> messagesToAck =
-                                                         submitToIndex( indexEventResults );
-                                                     if ( messagesToAck == null || messagesToAck.size() == 0 ) {
-                                                         logger.error(
-                                                             "No messages came back from the queue operation, should have seen {} messages",
-                                                                 messages.size() );
-                                                         return messagesToAck;
-                                                     }
-                                                     if ( messagesToAck.size() < messages.size() ) {
-                                                         logger.error( "Missing messages from queue post operation",
-                                                             messages, messagesToAck );
-                                                     }
-                                                     //ack each message, but only if we didn't error.
-                                                     ack( messagesToAck );
-                                                     return messagesToAck;
-                                                 }
-                                                 catch ( Exception e ) {
-                                                     logger.error( "failed to ack messages to sqs", e );
-                                                     return null;
-                                                     //do not rethrow so we can process all of them
-                                                 }
-                                             } ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() );
-                            //end flatMap
-                        }, indexProcessorFig.getEventConcurrencyFactor() );
-
-            //start in the background
-
-            final Subscription subscription = consumer.subscribeOn(Schedulers.newThread()).subscribe();
-
-            subscriptions.add(subscription);
-        }
-    }
-
-    /**
-     * Submit results to index and return the queue messages to be ack'd
-     * @param indexEventResults
-     * @return
-     */
-    private List<QueueMessage> submitToIndex( List<IndexEventResult> indexEventResults) {
-        //if nothing came back then return null
-        if(indexEventResults==null){
-            return null;
-        }
-
-        final IndexOperationMessage combined = new IndexOperationMessage();
-
-        //stream and filer the messages
-        List<QueueMessage> messagesToAck = indexEventResults.stream()
-            .map(indexEventResult -> {
-                //collect into the index submission
-                if (indexEventResult.getIndexOperationMessage().isPresent()) {
-                    combined.ingest(indexEventResult.getIndexOperationMessage().get());
-                }
-                return indexEventResult;
-            })
-                //filter out the ones that need to be ack'd
-            .filter(indexEventResult -> indexEventResult.getQueueMessage().isPresent())
-            .map(indexEventResult -> {
-                //record the cycle time
-                messageCycle.update(System.currentTimeMillis() - indexEventResult.getCreationTime());
-                return indexEventResult;
-            })
-                //ack after successful completion of the operation.
-            .map(result -> result.getQueueMessage().get())
-            .collect(Collectors.toList());
-
-            queueIndexOperationMessage( combined );
-
-        return messagesToAck;
-    }
-
-    public void index(final ApplicationScope applicationScope, final Id id, final long updatedSince) {
-        //change to id scope to avoid serialization issues
-        offer( new EntityIndexEvent(queueFig.getPrimaryRegion(), new EntityIdScope( applicationScope, id ), updatedSince ) );
-    }
-
-    public void indexBatch(final List<EdgeScope> edges, final long updatedSince) {
-
-        List batch = new ArrayList<EdgeScope>();
-        for ( EdgeScope e : edges){
-            //change to id scope to avoid serialization issues
-            batch.add(new EntityIndexEvent(queueFig.getPrimaryRegion(), new EntityIdScope(e.getApplicationScope(), e.getEdge().getTargetNode()), updatedSince));
-        }
-        offerBatch( batch );
-    }
-
-
-    public class IndexEventResult{
-        private final Optional<QueueMessage> queueMessage;
-        private final Optional<IndexOperationMessage> indexOperationMessage;
-        private final long creationTime;
-
-
-        public IndexEventResult(Optional<QueueMessage> queueMessage, Optional<IndexOperationMessage> indexOperationMessage, long creationTime){
-
-            this.queueMessage = queueMessage;
-            this.indexOperationMessage = indexOperationMessage;
-
-            this.creationTime = creationTime;
-        }
-
-
-        public Optional<QueueMessage> getQueueMessage() {
-            return queueMessage;
-        }
-
-        public Optional<IndexOperationMessage> getIndexOperationMessage() {
-            return indexOperationMessage;
-        }
-
-        public long getCreationTime() {
-            return creationTime;
-        }
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/b4634dc4/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
index dbf8996..288fb12 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
@@ -22,9 +22,7 @@ package org.apache.usergrid.corepersistence.asyncevents;
 
 import org.apache.usergrid.corepersistence.index.ReIndexAction;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.entities.Application;
 import org.apache.usergrid.persistence.graph.Edge;
-import org.apache.usergrid.persistence.index.IndexLocationStrategy;
 import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
 import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.Id;
@@ -48,8 +46,9 @@ public interface AsyncEventService extends ReIndexAction {
      * After SQS is removed, the tests should be enhanced to ensure that we're processing our queues correctly.
      * @param applicationScope
      * @param entity The entity to index.  Should be fired when an entity is updated
+     * @param updatedAfter
      */
-    void queueEntityIndexUpdate( final ApplicationScope applicationScope, final Entity entity);
+    void queueEntityIndexUpdate(final ApplicationScope applicationScope, final Entity entity, long updatedAfter);
 
 
     /**

http://git-wip-us.apache.org/repos/asf/usergrid/blob/b4634dc4/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
new file mode 100644
index 0000000..e101761
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
@@ -0,0 +1,798 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.asyncevents;
+
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.usergrid.persistence.index.impl.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.corepersistence.asyncevents.model.AsyncEvent;
+import org.apache.usergrid.corepersistence.asyncevents.model.EdgeDeleteEvent;
+import org.apache.usergrid.corepersistence.asyncevents.model.ElasticsearchIndexEvent;
+import org.apache.usergrid.corepersistence.asyncevents.model.EntityDeleteEvent;
+import org.apache.usergrid.corepersistence.asyncevents.model.EntityIndexEvent;
+import org.apache.usergrid.corepersistence.asyncevents.model.InitializeApplicationIndexEvent;
+import org.apache.usergrid.corepersistence.index.EntityIndexOperation;
+import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory;
+import org.apache.usergrid.corepersistence.index.IndexProcessorFig;
+import org.apache.usergrid.corepersistence.index.ReplicatedIndexLocationStrategy;
+import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
+import org.apache.usergrid.corepersistence.util.ObjectJsonSerializer;
+import org.apache.usergrid.persistence.collection.EntityCollectionManager;
+import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
+import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.index.EntityIndex;
+import org.apache.usergrid.persistence.index.EntityIndexFactory;
+import org.apache.usergrid.persistence.index.IndexLocationStrategy;
+import org.apache.usergrid.persistence.map.MapManager;
+import org.apache.usergrid.persistence.map.MapManagerFactory;
+import org.apache.usergrid.persistence.map.MapScope;
+import org.apache.usergrid.persistence.map.impl.MapScopeImpl;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+import org.apache.usergrid.persistence.queue.QueueFig;
+import org.apache.usergrid.persistence.queue.QueueManager;
+import org.apache.usergrid.persistence.queue.QueueManagerFactory;
+import org.apache.usergrid.persistence.queue.QueueMessage;
+import org.apache.usergrid.persistence.queue.QueueScope;
+import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Timer;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+import rx.Observable;
+import rx.Subscriber;
+import rx.Subscription;
+import rx.schedulers.Schedulers;
+
+
+/**
+ * TODO, this whole class is becoming a nightmare.  We need to remove all consume from this class and refactor it into the following manner.
+ *
+ * 1. Produce.  Keep the code in the handle as is
+ * 2. Consume:  Move the code into a refactored system
+ * 2.1 A central dispatcher
+ * 2.2 An interface that produces an observable of type BatchOperation.  Any handler will be refactored into it's own
+ *      impl that will then emit a stream of batch operations to perform
+ * 2.3 The central dispatcher will then subscribe to these events and merge them.  Handing them off to a batch handler
+ * 2.4 The batch handler will roll up the operations into a batch size, and then queue them
+ * 2.5 The receive batch handler will execute the batch operations
+ *
+ * TODO determine how we error handle?
+ *
+ */
+@Singleton
+public class AsyncEventServiceImpl implements AsyncEventService {
+
+
+    private static final Logger logger = LoggerFactory.getLogger(AsyncEventServiceImpl.class);
+
+    // SQS maximum receive messages is 10
+    public int MAX_TAKE = 10;
+    public static final String QUEUE_NAME = "index"; //keep this short as AWS limits queue name size to 80 chars
+
+    private final QueueManager queue;
+    private final IndexProcessorFig indexProcessorFig;
+    private final QueueFig queueFig;
+    private final IndexProducer indexProducer;
+    private final EntityCollectionManagerFactory entityCollectionManagerFactory;
+    private final IndexLocationStrategyFactory indexLocationStrategyFactory;
+    private final EntityIndexFactory entityIndexFactory;
+    private final EventBuilder eventBuilder;
+    private final RxTaskScheduler rxTaskScheduler;
+
+    private final Timer readTimer;
+    private final Timer writeTimer;
+    private final Timer ackTimer;
+
+    /**
+     * This mutex is used to start/stop workers to ensure we're not concurrently modifying our subscriptions
+     */
+    private final Object mutex = new Object();
+
+    private final Counter indexErrorCounter;
+    private final AtomicLong counter = new AtomicLong();
+    private final AtomicLong inFlight = new AtomicLong();
+    private final Histogram messageCycle;
+    private final MapManager esMapPersistence;
+
+    //the actively running subscription
+    private List<Subscription> subscriptions = new ArrayList<>();
+
+
+    @Inject
+    public AsyncEventServiceImpl(final QueueManagerFactory queueManagerFactory,
+                                 final IndexProcessorFig indexProcessorFig,
+                                 final IndexProducer indexProducer,
+                                 final MetricsFactory metricsFactory,
+                                 final EntityCollectionManagerFactory entityCollectionManagerFactory,
+                                 final IndexLocationStrategyFactory indexLocationStrategyFactory,
+                                 final EntityIndexFactory entityIndexFactory,
+                                 final EventBuilder eventBuilder,
+                                 final MapManagerFactory mapManagerFactory,
+                                 final QueueFig queueFig,
+                                 @EventExecutionScheduler
+                                    final RxTaskScheduler rxTaskScheduler ) {
+        this.indexProducer = indexProducer;
+
+        this.entityCollectionManagerFactory = entityCollectionManagerFactory;
+        this.indexLocationStrategyFactory = indexLocationStrategyFactory;
+        this.entityIndexFactory = entityIndexFactory;
+        this.eventBuilder = eventBuilder;
+
+        final MapScope mapScope = new MapScopeImpl( CpNamingUtils.getManagementApplicationId(),  "indexEvents");
+
+        this.esMapPersistence = mapManagerFactory.createMapManager( mapScope );
+
+        this.rxTaskScheduler = rxTaskScheduler;
+
+        QueueScope queueScope = new QueueScopeImpl(QUEUE_NAME, QueueScope.RegionImplementation.ALL);
+        this.queue = queueManagerFactory.getQueueManager(queueScope);
+
+        this.indexProcessorFig = indexProcessorFig;
+        this.queueFig = queueFig;
+
+        this.writeTimer = metricsFactory.getTimer(AsyncEventServiceImpl.class, "async_event.write");
+        this.readTimer = metricsFactory.getTimer(AsyncEventServiceImpl.class, "async_event.read");
+        this.ackTimer = metricsFactory.getTimer(AsyncEventServiceImpl.class, "async_event.ack");
+        this.indexErrorCounter = metricsFactory.getCounter(AsyncEventServiceImpl.class, "async_event.error");
+        this.messageCycle = metricsFactory.getHistogram(AsyncEventServiceImpl.class, "async_event.message_cycle");
+
+
+        //wire up the gauge of inflight message
+        metricsFactory.addGauge(AsyncEventServiceImpl.class, "async-event.inflight", new Gauge<Long>() {
+            @Override
+            public Long getValue() {
+                return inFlight.longValue();
+            }
+        });
+
+        start();
+    }
+
+
+    /**
+     * Offer the EntityIdScope to SQS
+     */
+    private void offer(final Serializable operation) {
+        final Timer.Context timer = this.writeTimer.time();
+
+        try {
+            //signal to SQS
+            this.queue.sendMessage( operation );
+        } catch (IOException e) {
+            throw new RuntimeException("Unable to queue message", e);
+        } finally {
+            timer.stop();
+        }
+    }
+
+
+    private void offerTopic( final Serializable operation ) {
+        final Timer.Context timer = this.writeTimer.time();
+
+        try {
+            //signal to SQS
+            this.queue.sendMessageToTopic( operation );
+        }
+        catch ( IOException e ) {
+            throw new RuntimeException( "Unable to queue message", e );
+        }
+        finally {
+            timer.stop();
+        }
+    }
+
+    private void offerBatch(final List operations){
+        final Timer.Context timer = this.writeTimer.time();
+
+        try {
+            //signal to SQS
+            this.queue.sendMessages(operations);
+        } catch (IOException e) {
+            throw new RuntimeException("Unable to queue message", e);
+        } finally {
+            timer.stop();
+        }
+    }
+
+
+    /**
+     * Take message from SQS
+     */
+    private List<QueueMessage> take() {
+
+        final Timer.Context timer = this.readTimer.time();
+
+        try {
+            return queue.getMessages(MAX_TAKE,
+                    indexProcessorFig.getIndexQueueVisibilityTimeout(),
+                    indexProcessorFig.getIndexQueueTimeout(),
+                    AsyncEvent.class);
+        }
+        //stop our timer
+        finally {
+            timer.stop();
+        }
+    }
+
+
+
+    /**
+     * Ack message in SQS
+     */
+    public void ack(final List<QueueMessage> messages) {
+
+        final Timer.Context timer = this.ackTimer.time();
+
+        try{
+            queue.commitMessages( messages );
+
+            //decrement our in-flight counter
+            inFlight.decrementAndGet();
+
+        }catch(Exception e){
+            throw new RuntimeException("Unable to ack messages", e);
+        }finally {
+            timer.stop();
+        }
+
+
+    }
+
+    /**
+     * calls the event handlers and returns a result with information on whether it needs to be ack'd and whether it needs to be indexed
+     * @param messages
+     * @return
+     */
+    private List<IndexEventResult> callEventHandlers(final List<QueueMessage> messages) {
+
+        if (logger.isDebugEnabled()) {
+            logger.debug("callEventHandlers with {} message", messages.size());
+        }
+
+        Stream<IndexEventResult> indexEventResults = messages.parallelStream().map(message ->
+
+        {
+            AsyncEvent event = null;
+            try {
+                event = (AsyncEvent) message.getBody();
+
+            } catch (ClassCastException cce) {
+                logger.error("Failed to deserialize message body", cce);
+                return new IndexEventResult(Optional.absent(), System.currentTimeMillis());
+            }
+
+            if (event == null) {
+                logger.error("AsyncEvent type or event is null!");
+                return new IndexEventResult(Optional.absent(), System.currentTimeMillis());
+            }
+
+            final AsyncEvent thisEvent = event;
+
+            if (logger.isDebugEnabled()) {
+                logger.debug("Processing {} event", event);
+            }
+
+            try {
+
+                // deletes are 2-part, actual IO to delete data, then queue up a de-index
+                if ( event instanceof EdgeDeleteEvent ) {
+
+                    handleEdgeDelete( message );
+                }
+                // deletes are 2-part, actual IO to delete data, then queue up a de-index
+                else if ( event instanceof EntityDeleteEvent ) {
+
+                    handleEntityDelete( message );
+                }
+                // application initialization has special logic, therefore a special event type
+                else if ( event instanceof InitializeApplicationIndexEvent ) {
+
+                    handleInitializeApplicationIndex(event, message);
+                }
+                // this is the main event that pulls the index doc from map persistence and hands to the index producer
+                else if (event instanceof ElasticsearchIndexEvent) {
+
+                    handleIndexOperation((ElasticsearchIndexEvent) event);
+
+                } else {
+
+                    throw new Exception("Unknown EventType for message: "+ message.getStringBody());
+                }
+
+
+                //return type that can be indexed and ack'd later
+                return new IndexEventResult(Optional.of(message), thisEvent.getCreationTime());
+
+            } catch (IndexDocNotFoundException e){
+
+                // this exception is throw when we wait before trying quorum read on map persistence.
+                // return empty event result so the event's message doesn't get ack'd
+                logger.info(e.getMessage());
+                return new IndexEventResult(Optional.absent(), event.getCreationTime());
+
+            } catch (Exception e) {
+
+                // if the event fails to process, log the message and return empty event result so it doesn't get ack'd
+                logger.error("Failed to process message: {} {}", message.getMessageId(), message.getStringBody(), e);
+                return new IndexEventResult(Optional.absent(), event.getCreationTime());
+            }
+        });
+
+
+        return indexEventResults.collect(Collectors.toList());
+    }
+
+    @Override
+    public void queueInitializeApplicationIndex( final ApplicationScope applicationScope) {
+        IndexLocationStrategy indexLocationStrategy = indexLocationStrategyFactory.getIndexLocationStrategy(
+            applicationScope);
+        offerTopic( new InitializeApplicationIndexEvent( queueFig.getPrimaryRegion(),
+            new ReplicatedIndexLocationStrategy( indexLocationStrategy ) ) );
+    }
+
+
+    @Override
+    public void queueEntityIndexUpdate(final ApplicationScope applicationScope,
+                                       final Entity entity, long updatedAfter) {
+
+
+        final EntityIndexOperation entityIndexOperation =
+            new EntityIndexOperation( applicationScope, entity.getId(), updatedAfter);
+
+        final IndexOperationMessage indexMessage =
+            eventBuilder.buildEntityIndex( entityIndexOperation ).toBlocking().lastOrDefault(null);
+
+        queueIndexOperationMessage( indexMessage );
+
+    }
+
+
+    @Override
+    public void queueNewEdge(final ApplicationScope applicationScope,
+                             final Entity entity,
+                             final Edge newEdge) {
+
+        final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager( applicationScope );
+
+        final IndexOperationMessage indexMessage = ecm.load( entity.getId() )
+            .flatMap( loadedEntity -> eventBuilder.buildNewEdge(applicationScope, entity, newEdge) )
+            .toBlocking().lastOrDefault(null);
+
+        queueIndexOperationMessage( indexMessage );
+
+    }
+
+
+    @Override
+    public void queueDeleteEdge(final ApplicationScope applicationScope,
+                                final Edge edge) {
+
+        offer( new EdgeDeleteEvent( queueFig.getPrimaryRegion(), applicationScope, edge ) );
+    }
+
+    public void handleEdgeDelete(final QueueMessage message) {
+
+        Preconditions.checkNotNull( message, "Queue Message cannot be null for handleEdgeDelete" );
+
+        final AsyncEvent event = (AsyncEvent) message.getBody();
+
+        Preconditions.checkNotNull( message, "QueueMessage Body cannot be null for handleEdgeDelete" );
+        Preconditions.checkArgument(event instanceof EdgeDeleteEvent, String.format("Event Type for handleEdgeDelete must be EDGE_DELETE, got %s", event.getClass()));
+
+
+        final EdgeDeleteEvent edgeDeleteEvent = ( EdgeDeleteEvent ) event;
+
+        final ApplicationScope applicationScope = edgeDeleteEvent.getApplicationScope();
+        final Edge edge = edgeDeleteEvent.getEdge();
+
+        if (logger.isDebugEnabled()) {
+            logger.debug("Deleting in app scope {} with edge {}", applicationScope, edge);
+        }
+
+        IndexOperationMessage indexMessage =
+            eventBuilder.buildDeleteEdge(applicationScope, edge).toBlocking().lastOrDefault(null);
+
+        queueIndexOperationMessage(indexMessage);
+
+    }
+
+
+
+    /**
+     * Queue up an indexOperationMessage for multi region execution
+     * @param indexOperationMessage
+     */
+    public void queueIndexOperationMessage( final IndexOperationMessage indexOperationMessage ) {
+
+        // don't try to produce something with nothing
+        if(indexOperationMessage == null || indexOperationMessage.isEmpty()){
+            return;
+        }
+
+        final String jsonValue = ObjectJsonSerializer.INSTANCE.toString( indexOperationMessage );
+
+        final UUID newMessageId = UUIDGenerator.newTimeUUID();
+
+        final int expirationTimeInSeconds =
+            ( int ) TimeUnit.MILLISECONDS.toSeconds( indexProcessorFig.getIndexMessageTtl() );
+
+        //write to the map in ES
+        esMapPersistence.putString( newMessageId.toString(), jsonValue, expirationTimeInSeconds );
+
+
+
+        //now queue up the index message
+
+        final ElasticsearchIndexEvent elasticsearchIndexEvent =
+            new ElasticsearchIndexEvent(queueFig.getPrimaryRegion(), newMessageId );
+
+        //send to the topic so all regions index the batch
+
+        offerTopic( elasticsearchIndexEvent );
+    }
+
+    public void handleIndexOperation(final ElasticsearchIndexEvent elasticsearchIndexEvent){
+         Preconditions.checkNotNull( elasticsearchIndexEvent, "elasticsearchIndexEvent cannot be null" );
+
+        final UUID messageId = elasticsearchIndexEvent.getIndexBatchId();
+
+        Preconditions.checkNotNull( messageId, "messageId must not be null" );
+
+
+        //load the entity
+
+        final String message = esMapPersistence.getString( messageId.toString() );
+
+        final IndexOperationMessage indexOperationMessage;
+
+        if(message == null) {
+
+           if ( System.currentTimeMillis() > elasticsearchIndexEvent.getCreationTime() + queueFig.getLocalQuorumTimeout() ) {
+
+               logger.warn("Received message with id {} to process, unable to find it, reading with higher consistency level",
+                   messageId);
+
+               final String highConsistency = esMapPersistence.getStringHighConsistency(messageId.toString());
+
+               if (highConsistency == null) {
+                   logger.error("Unable to find the ES batch with id {} to process at a higher consistency level",
+                       messageId);
+
+                   throw new RuntimeException("Unable to find the ES batch to process with message id " + messageId);
+               }
+
+               indexOperationMessage = ObjectJsonSerializer.INSTANCE.fromString(highConsistency, IndexOperationMessage.class);
+
+           } else{
+
+               throw new IndexDocNotFoundException(elasticsearchIndexEvent.getIndexBatchId());
+
+           }
+
+        } else{
+            indexOperationMessage = ObjectJsonSerializer.INSTANCE.fromString( message, IndexOperationMessage.class );
+        }
+
+        initializeEntityIndexes(indexOperationMessage);
+
+        //NOTE that we intentionally do NOT delete from the map.  We can't know when all regions have consumed the message
+        //so we'll let compaction on column expiration handle deletion
+
+        //read the value from the string
+
+        Preconditions.checkNotNull( indexOperationMessage, "indexOperationMessage cannot be null" );
+        Preconditions.checkArgument( !indexOperationMessage.isEmpty() , "queued indexOperationMessage messages should not be empty" );
+
+
+        //now execute it
+        indexProducer.put(indexOperationMessage).toBlocking().last();
+
+    }
+
+    /**
+     *     this method will call initialize for each message, since we are caching the entity indexes,
+     *     we don't worry about aggregating by app id
+     * @param indexOperationMessage
+     */
+    private void initializeEntityIndexes(final IndexOperationMessage indexOperationMessage) {
+
+        // create a set so we can have a unique list of appIds for which we call createEntityIndex
+        Set<UUID> appIds = new HashSet<>();
+
+        // loop through all indexRequests and add the appIds to the set
+        indexOperationMessage.getIndexRequests().forEach(req -> {
+            UUID appId = IndexingUtils.getApplicationIdFromIndexDocId(req.documentId);
+            appIds.add(appId);
+        });
+
+        // loop through all deindexRequests and add the appIds to the set
+        indexOperationMessage.getDeIndexRequests().forEach(req -> {
+            UUID appId = IndexingUtils.getApplicationIdFromIndexDocId(req.documentId);
+            appIds.add(appId);
+        });
+
+        // for each of the appIds in the unique set, call create entity index to ensure the aliases are created
+        appIds.forEach(appId -> {
+                ApplicationScope appScope = CpNamingUtils.getApplicationScope(appId);
+                entityIndexFactory.createEntityIndex(indexLocationStrategyFactory.getIndexLocationStrategy(appScope));
+            }
+        );
+    }
+
+
+    @Override
+    public long getQueueDepth() {
+        return queue.getQueueDepth();
+    }
+
+    @Override
+    public void queueEntityDelete(final ApplicationScope applicationScope, final Id entityId) {
+
+        offer( new EntityDeleteEvent(queueFig.getPrimaryRegion(), new EntityIdScope( applicationScope, entityId ) ) );
+    }
+
+    public void handleEntityDelete(final QueueMessage message) {
+
+        Preconditions.checkNotNull(message, "Queue Message cannot be null for handleEntityDelete");
+
+        final AsyncEvent event = (AsyncEvent) message.getBody();
+        Preconditions.checkNotNull( message, "QueueMessage Body cannot be null for handleEntityDelete" );
+        Preconditions.checkArgument( event instanceof EntityDeleteEvent,
+            String.format( "Event Type for handleEntityDelete must be ENTITY_DELETE, got %s", event.getClass() ) );
+
+
+        final EntityDeleteEvent entityDeleteEvent = ( EntityDeleteEvent ) event;
+        final ApplicationScope applicationScope = entityDeleteEvent.getEntityIdScope().getApplicationScope();
+        final Id entityId = entityDeleteEvent.getEntityIdScope().getId();
+
+        if (logger.isDebugEnabled())
+            logger.debug("Deleting entity id from index in app scope {} with entityId {}", applicationScope, entityId);
+
+        final EventBuilderImpl.EntityDeleteResults
+            entityDeleteResults = eventBuilder.buildEntityDelete( applicationScope, entityId );
+
+
+        // Delete the entities and remove from graph separately
+        entityDeleteResults.getEntitiesDeleted().toBlocking().lastOrDefault(null);
+
+        entityDeleteResults.getCompactedNode().toBlocking().lastOrDefault(null);
+
+        IndexOperationMessage indexMessage = entityDeleteResults.getIndexObservable().toBlocking().lastOrDefault(null);
+
+        queueIndexOperationMessage(indexMessage);
+
+    }
+
+
+    public void handleInitializeApplicationIndex(final AsyncEvent event, final QueueMessage message) {
+        Preconditions.checkNotNull(message, "Queue Message cannot be null for handleInitializeApplicationIndex");
+        Preconditions.checkArgument(event instanceof InitializeApplicationIndexEvent, String.format("Event Type for handleInitializeApplicationIndex must be APPLICATION_INDEX, got %s", event.getClass()));
+
+        final InitializeApplicationIndexEvent initializeApplicationIndexEvent =
+            ( InitializeApplicationIndexEvent ) event;
+
+        final IndexLocationStrategy indexLocationStrategy = initializeApplicationIndexEvent.getIndexLocationStrategy();
+        final EntityIndex index = entityIndexFactory.createEntityIndex( indexLocationStrategy );
+        index.initialize();
+    }
+
+    /**
+     * Loop through and start the workers
+     */
+    public void start() {
+        final int count = indexProcessorFig.getWorkerCount();
+
+        for (int i = 0; i < count; i++) {
+            startWorker();
+        }
+    }
+
+
+    /**
+     * Stop the workers
+     */
+    public void stop() {
+        synchronized (mutex) {
+            //stop consuming
+
+            for (final Subscription subscription : subscriptions) {
+                subscription.unsubscribe();
+            }
+        }
+    }
+
+
+    private void startWorker() {
+        synchronized (mutex) {
+
+            Observable<List<QueueMessage>> consumer =
+                    Observable.create( new Observable.OnSubscribe<List<QueueMessage>>() {
+                        @Override
+                        public void call( final Subscriber<? super List<QueueMessage>> subscriber ) {
+
+                            //name our thread so it's easy to see
+                            Thread.currentThread().setName( "QueueConsumer_" + counter.incrementAndGet() );
+
+                            List<QueueMessage> drainList = null;
+
+                            do {
+                                try {
+                                    drainList = take();
+                                    //emit our list in it's entity to hand off to a worker pool
+                                        subscriber.onNext(drainList);
+
+                                    //take since  we're in flight
+                                    inFlight.addAndGet( drainList.size() );
+                                }
+                                catch ( Throwable t ) {
+                                    final long sleepTime = indexProcessorFig.getFailureRetryTime();
+
+                                    logger.error( "Failed to dequeue.  Sleeping for {} milliseconds", sleepTime, t );
+
+                                    if ( drainList != null ) {
+                                        inFlight.addAndGet( -1 * drainList.size() );
+                                    }
+
+
+                                    try {
+                                        Thread.sleep( sleepTime );
+                                    }
+                                    catch ( InterruptedException ie ) {
+                                        //swallow
+                                    }
+
+                                    indexErrorCounter.inc();
+                                }
+                            }
+                            while ( true );
+                        }
+                    } )        //this won't block our read loop, just reads and proceeds
+                        .flatMap( sqsMessages -> {
+
+                            //do this on a different schedule, and introduce concurrency with flatmap for faster processing
+                            return Observable.just( sqsMessages )
+
+                                             .map( messages -> {
+                                                 if ( messages == null || messages.size() == 0 ) {
+                                                     return null;
+                                                 }
+
+                                                 try {
+                                                     List<IndexEventResult> indexEventResults = callEventHandlers( messages );
+                                                     List<QueueMessage> messagesToAck = ackMessages( indexEventResults );
+
+                                                     if ( messagesToAck == null || messagesToAck.size() == 0 ) {
+                                                         logger.error(
+                                                             "No messages came back from the queue operation, should have seen {} messages",
+                                                                 messages.size() );
+                                                         return messagesToAck;
+                                                     }
+
+                                                     if ( messagesToAck.size() < messages.size() ) {
+                                                         logger.error( "Missing messages from queue post operation",
+                                                             messages, messagesToAck );
+                                                     }
+                                                     //ack each message, but only if we didn't error.
+                                                     ack( messagesToAck );
+                                                     return messagesToAck;
+                                                 }
+                                                 catch ( Exception e ) {
+                                                     logger.error( "failed to ack messages to sqs", e );
+                                                     return null;
+                                                     //do not rethrow so we can process all of them
+                                                 }
+                                             } ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() );
+                            //end flatMap
+                        }, indexProcessorFig.getEventConcurrencyFactor() );
+
+            //start in the background
+
+            final Subscription subscription = consumer.subscribeOn(Schedulers.newThread()).subscribe();
+
+            subscriptions.add(subscription);
+        }
+    }
+
+    /**
+     * Submit results to index and return the queue messages to be ack'd
+     * @param indexEventResults
+     * @return
+     */
+    private List<QueueMessage> ackMessages(List<IndexEventResult> indexEventResults) {
+        //if nothing came back then return null
+        if(indexEventResults==null){
+            return null;
+        }
+
+        // stream the messages to record the cycle time
+        return indexEventResults.stream()
+            .map(indexEventResult -> {
+                //record the cycle time
+                messageCycle.update(System.currentTimeMillis() - indexEventResult.getCreationTime());
+                return indexEventResult;
+            })
+            // filter out messages that are not present, they were not processed and put into the results
+            .filter( result -> result.getQueueMessage().isPresent() )
+            .map(result -> result.getQueueMessage().get())
+            // collect
+            .collect(Collectors.toList());
+    }
+
+    public void index(final ApplicationScope applicationScope, final Id id, final long updatedSince) {
+        //change to id scope to avoid serialization issues
+        offer( new EntityIndexEvent(queueFig.getPrimaryRegion(), new EntityIdScope( applicationScope, id ), updatedSince ) );
+    }
+
+    public void indexBatch(final List<EdgeScope> edges, final long updatedSince) {
+
+        List batch = new ArrayList<EdgeScope>();
+        for ( EdgeScope e : edges){
+            //change to id scope to avoid serialization issues
+            batch.add(new EntityIndexEvent(queueFig.getPrimaryRegion(), new EntityIdScope(e.getApplicationScope(), e.getEdge().getTargetNode()), updatedSince));
+        }
+        offerBatch( batch );
+    }
+
+
+    public class IndexEventResult{
+        private final Optional<QueueMessage> queueMessage;
+        private final long creationTime;
+
+        public IndexEventResult(Optional<QueueMessage> queueMessage, long creationTime){
+
+            this.queueMessage = queueMessage;
+            this.creationTime = creationTime;
+        }
+
+
+        public Optional<QueueMessage> getQueueMessage() {
+            return queueMessage;
+        }
+
+        public long getCreationTime() {
+            return creationTime;
+        }
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/b4634dc4/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
index 96da2df..abd4ce1 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
@@ -104,14 +104,14 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> {
 
         switch (impl) {
             case LOCAL:
-                AmazonAsyncEventService eventService = new AmazonAsyncEventService(scope -> new LocalQueueManager(), indexProcessorFig, indexProducer, metricsFactory,
+                AsyncEventServiceImpl eventService = new AsyncEventServiceImpl(scope -> new LocalQueueManager(), indexProcessorFig, indexProducer, metricsFactory,
                     entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory, eventBuilder,mapManagerFactory, queueFig,rxTaskScheduler);
                 eventService.MAX_TAKE = 1000;
                 return eventService;
             case SQS:
                 throw new IllegalArgumentException("Configuration value of SQS is no longer allowed. Use SNS instead with only a single region");
             case SNS:
-                return new AmazonAsyncEventService(queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory,
+                return new AsyncEventServiceImpl(queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory,
                     entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, mapManagerFactory, queueFig, rxTaskScheduler );
             default:
                 throw new IllegalArgumentException("Configuration value of " + getErrorValues() + " are allowed");

http://git-wip-us.apache.org/repos/asf/usergrid/blob/b4634dc4/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
index 480756f..a47ec77 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
@@ -39,14 +39,6 @@ import rx.Observable;
 public interface EventBuilder {
 
     /**
-     * Return the cold observable of entity index update operations
-     * @param applicationScope
-     * @param entity
-     * @return
-     */
-    Observable<IndexOperationMessage> buildEntityIndexUpdate( ApplicationScope applicationScope, Entity entity );
-
-    /**
      * Return the cold observable of the new edge operation
      * @param applicationScope
      * @param entity
@@ -69,7 +61,9 @@ public interface EventBuilder {
      * @param entityId
      * @return
      */
-    EventBuilderImpl.EntityDeleteResults buildEntityDelete( ApplicationScope applicationScope, Id entityId );
+    EntityDeleteResults buildEntityDelete(ApplicationScope applicationScope, Id entityId );
+
+
 
     /**
      * Re-index an entity in the scope provided

http://git-wip-us.apache.org/repos/asf/usergrid/blob/b4634dc4/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
index 4e476db..2edc668 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
@@ -73,19 +73,6 @@ public class EventBuilderImpl implements EventBuilder {
     }
 
 
-    @Override
-    public Observable<IndexOperationMessage> buildEntityIndexUpdate( final ApplicationScope applicationScope,
-                                                                     final Entity entity ) {
-        //process the entity immediately
-        //only process the same version, otherwise ignore
-
-        if (logger.isDebugEnabled()) {
-            logger.debug("Indexing  in app scope {} entity {}", entity, applicationScope);
-        }
-
-        return indexService.indexEntity( applicationScope, entity );
-    }
-
 
     @Override
     public Observable<IndexOperationMessage> buildNewEdge( final ApplicationScope applicationScope, final Entity entity,
@@ -118,7 +105,7 @@ public class EventBuilderImpl implements EventBuilder {
     //it'll need to be pushed up higher so we can do the marking that isn't async or does it not matter?
 
     @Override
-    public EntityDeleteResults buildEntityDelete( final ApplicationScope applicationScope, final Id entityId ) {
+    public EntityDeleteResults buildEntityDelete(final ApplicationScope applicationScope, final Id entityId ) {
         if (logger.isDebugEnabled()) {
             logger.debug("Deleting entity id from index in app scope {} with entityId {}", applicationScope, entityId);
         }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/b4634dc4/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/IndexDocNotFoundException.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/IndexDocNotFoundException.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/IndexDocNotFoundException.java
new file mode 100644
index 0000000..c0e022f
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/IndexDocNotFoundException.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.asyncevents;
+
+
+import java.util.UUID;
+
+public class IndexDocNotFoundException extends RuntimeException {
+
+    final UUID batchId;
+
+    public IndexDocNotFoundException(final UUID batchId){
+
+        super("Index batch ID "+batchId.toString()+" not found in map persistence");
+        this.batchId = batchId;
+
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/b4634dc4/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
index 68c398f..7512c90 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
@@ -22,6 +22,7 @@ package org.apache.usergrid.corepersistence.index;
 
 import java.util.Iterator;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.usergrid.persistence.index.*;
 import org.apache.usergrid.utils.UUIDUtils;
@@ -104,7 +105,8 @@ public class IndexServiceImpl implements IndexService {
 
         //do our observable for batching
         //try to send a whole batch if we can
-        final Observable<IndexOperationMessage>  batches =  sourceEdgesToIndex.buffer( indexFig.getIndexBatchSize() )
+        final Observable<IndexOperationMessage>  batches =  sourceEdgesToIndex
+            .buffer( indexFig.getIndexBatchSize(), 1000, TimeUnit.MILLISECONDS )
 
             //map into batches based on our buffer size
             .flatMap( buffer -> Observable.from( buffer )

http://git-wip-us.apache.org/repos/asf/usergrid/blob/b4634dc4/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java
index bf444b5..d47e96c 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java
@@ -230,10 +230,16 @@ public class CandidateEntityFilter extends AbstractFilter<FilterResult<Candidate
 
 
             //entity is newer than ES version, could be an update or the entity is marked as deleted
-            if ( UUIDComparator.staticCompare( entityVersion, candidateVersion ) > 0 || !entity.getEntity().isPresent()) {
+            if ( UUIDComparator.staticCompare( entityVersion, candidateVersion ) > 0 ||
+                    !entity.getEntity().isPresent()  ||
+                    entity.getStatus() == MvccEntity.Status.DELETED ) {
 
-                logger.warn( "Deindexing stale entity on edge {} for entityId {} and version {}",
+                // when updating entities, we don't delete previous versions from ES so this action is expected
+                if(logger.isDebugEnabled()){
+                    logger.debug( "Deindexing stale entity on edge {} for entityId {} and version {}",
                         searchEdge, entityId, entityVersion);
+                }
+
                 batch.deindex( searchEdge, entityId, candidateVersion );
                 return;
             }


[02/10] usergrid git commit: Fixed test to properly verify issue and ignored it due to not currently supporting keeping custom collections. Commented out the code for dynamic collections as it isn't currently supported.

Posted by mr...@apache.org.
Fixed test to properly verify issue and ignored it due to not currently supporting keeping custom collections.
Commented out the code for dynamic collections as it isn't currently supported.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/68b079a3
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/68b079a3
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/68b079a3

Branch: refs/heads/release-2.1.1
Commit: 68b079a39db53b61429949857f72dec66330203f
Parents: 12d344f
Author: George Reyes <gr...@apache.org>
Authored: Fri Feb 19 10:58:41 2016 -0800
Committer: George Reyes <gr...@apache.org>
Committed: Fri Feb 19 10:58:41 2016 -0800

----------------------------------------------------------------------
 .../corepersistence/CpEntityManager.java        | 21 ++++++++++----------
 .../collection/CollectionsResourceIT.java       | 11 ++++++++--
 2 files changed, 20 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/68b079a3/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index d2f549b..9d2963c 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -747,16 +747,17 @@ public class CpEntityManager implements EntityManager {
 
         Set<String> existingCollections = getRelationManager( getApplication() ).getCollections();
 
-        Set<String> dynamic_collections = cast( getDictionaryAsSet( getApplicationRef(), Schema.DICTIONARY_COLLECTIONS ) );
-        if ( dynamic_collections != null ) {
-            for ( String collection : dynamic_collections ) {
-                if ( !Schema.isAssociatedEntityType( collection ) ) {
-                    if(!existingCollections.contains( collection )) {
-                        existingCollections.add( collection );
-                    }
-                }
-            }
-        }
+        //Handles reading custom collections.
+//        Set<String> dynamic_collections = cast( getDictionaryAsSet( getApplicationRef(), Schema.DICTIONARY_COLLECTIONS ) );
+//        if ( dynamic_collections != null ) {
+//            for ( String collection : dynamic_collections ) {
+//                if ( !Schema.isAssociatedEntityType( collection ) ) {
+//                    if(!existingCollections.contains( collection )) {
+//                        existingCollections.add( collection );
+//                    }
+//                }
+//            }
+//        }
         Set<String> system_collections = Schema.getDefaultSchema().getCollectionNames( Application.ENTITY_TYPE );
         if ( system_collections != null ) {
             for ( String collection : system_collections ) {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/68b079a3/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/CollectionsResourceIT.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/CollectionsResourceIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/CollectionsResourceIT.java
index 66c94ce..9752035 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/CollectionsResourceIT.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/CollectionsResourceIT.java
@@ -248,8 +248,10 @@ public class CollectionsResourceIT extends AbstractRestIT {
         }
     }
 
+    @Ignore("Ignored because we no longer retain custom collections after deleting the last entity in a collection"
+        + "This test can be used to verify that works when we implement it")
     @Test
-    public void testNewlyCreatedCollectionReturnWhenEmpty(){
+    public void testNewlyCreatedCollectionReturnsWhenEmpty(){
         String collectionName =  "testDefaultCollectionReturnings";
 
         Map<String,Object> payload = new HashMap(  );
@@ -265,6 +267,7 @@ public class CollectionsResourceIT extends AbstractRestIT {
 
         this.refreshIndex();
         this.app().collection( collectionName ).entity( testEntity.getEntity().getUuid() ).delete();
+        this.refreshIndex();
 
 
         //Verify that the collection still exists despite deleting its only entity.)
@@ -272,7 +275,11 @@ public class CollectionsResourceIT extends AbstractRestIT {
 
         collectionHashMap = ( LinkedHashMap ) usersDefaultCollection.getEntity().get( "metadata" );
 
-        assertNotSame( null,((LinkedHashMap)(collectionHashMap.get( "collections" ))).get( collectionName ));
+        assertNotSame( null,((LinkedHashMap)(collectionHashMap.get( "collections" ))).get( collectionName.toLowerCase() ));
+
+        Collection createdCollectionResponse = this.app().collection( collectionName ).get();
+
+        assertEquals( 0,createdCollectionResponse.getNumOfEntities() );
     }