You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2014/10/17 18:00:37 UTC

[01/13] git commit: Work in progress on refactoring visitor to only update entities since connections are handled internally

Repository: incubator-usergrid
Updated Branches:
  refs/heads/cloudformation-update 704c09d78 -> 059a95233


Work in progress on refactoring visitor to only update entities since connections are handled internally


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/163fa9ad
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/163fa9ad
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/163fa9ad

Branch: refs/heads/cloudformation-update
Commit: 163fa9adf163226521d5724dce1cdac974de3b38
Parents: 9da529b
Author: Todd Nine <to...@apache.org>
Authored: Thu Oct 16 11:39:12 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Thu Oct 16 11:39:12 2014 -0600

----------------------------------------------------------------------
 .../corepersistence/CpEntityManager.java        |  83 ++--------
 .../corepersistence/CpEntityManagerFactory.java |  22 +--
 .../usergrid/corepersistence/CpVisitor.java     |  11 +-
 .../usergrid/corepersistence/CpWalker.java      | 155 +++++++------------
 .../persistence/EntityManagerFactory.java       |   8 +-
 .../org/apache/usergrid/CoreApplication.java    |   1 +
 .../PerformanceEntityRebuildIndexTest.java      |  20 ++-
 .../apache/usergrid/rest/SystemResource.java    |  47 +++---
 8 files changed, 135 insertions(+), 212 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/163fa9ad/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index 8753670..4015014 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -109,6 +109,7 @@ import static org.apache.usergrid.persistence.cassandra.Serializers.ue;
 import org.apache.usergrid.persistence.cassandra.util.TraceParticipant;
 import org.apache.usergrid.persistence.collection.CollectionScope;
 import org.apache.usergrid.persistence.collection.EntityCollectionManager;
+import org.apache.usergrid.persistence.collection.exception.WriteOptimisticVerifyException;
 import org.apache.usergrid.persistence.collection.exception.WriteUniqueVerifyException;
 import org.apache.usergrid.persistence.collection.impl.CollectionScopeImpl;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
@@ -2869,94 +2870,32 @@ public class CpEntityManager implements EntityManager {
      */
     public void reindex( final EntityManagerFactory.ProgressObserver po ) throws Exception {
 
-        CpWalker walker = new CpWalker();
+        CpWalker walker = new CpWalker(po.getWriteDelayTime());
 
         walker.walkCollections( this, application, new CpVisitor() {
 
             @Override
             public void visitCollectionEntry(
-                    EntityManager em, String collName, Entity source, Entity target) {
+                    EntityManager em, String collName, Entity entity) {
 
                 try {
-                    em.update( target);
-                    po.onProgress(source, target, collName);
-
-                } catch (Exception ex) {
-                    logger.error("Error repersisting entity", ex);
+                    em.update( entity);
+                    po.onProgress(entity);
                 }
-            }
-
-            @Override
-            public void visitConnectionEntry(
-                    EntityManager em, String connType, Entity source, Entity target) {
-
-                try {
-                    em.update( target);
-                    po.onProgress(source, target, connType);
-
-                } catch (Exception ex) {
+                catch(WriteOptimisticVerifyException wo ){
+                    //swallow this, it just means this was already updated, which accomplishes our task.  Just ignore.
+                    logger.warn( "Someone beat us to updating entity {} in collection {}.  Ignoring.", entity.getName(), collName );
+                }
+                catch (Exception ex) {
                     logger.error("Error repersisting entity", ex);
                 }
             }
 
-        });
-    }
-
-
-    private void indexEntityIntoCollections( 
-            org.apache.usergrid.persistence.model.entity.Entity collectionEntity, 
-            org.apache.usergrid.persistence.model.entity.Entity memberEntity, 
-            String collName, 
-            boolean connectBack ) {
-
-        logger.debug("Indexing into collections {} {}:{} member {}:{}", new Object[] { 
-            collName, collectionEntity.getId().getType(), collectionEntity.getId().getUuid(),
-            memberEntity.getId().getType(), memberEntity.getId().getUuid() });
-
-        indexEntityIntoCollection( collectionEntity, memberEntity, collName);
 
-        CollectionInfo collection = getDefaultSchema()
-                .getCollection( memberEntity.getId().getType(), collName);
-
-        if (connectBack && collection != null && collection.getLinkedCollection() != null) {
-
-            logger.debug("Linking back from entity in collection {} to collection {}", 
-                collection.getName(), collection.getLinkedCollection());
-
-            indexEntityIntoCollections( 
-                memberEntity, collectionEntity, collection.getLinkedCollection(), false );
-        }
+        });
     }
 
 
-    void indexEntityIntoConnection(
-            org.apache.usergrid.persistence.model.entity.Entity sourceEntity,
-            org.apache.usergrid.persistence.model.entity.Entity targetEntity,
-            String connType) {
-
-        logger.debug("Indexing into connection {} source {}:{} target {}:{}", new Object[] { 
-            connType, sourceEntity.getId().getType(), sourceEntity.getId().getUuid(),
-            targetEntity.getId().getType(), targetEntity.getId().getUuid() });
-
-
-        final EntityIndex ei = getManagerCache().getEntityIndex(getApplicationScope());
-        final EntityIndexBatch batch = ei.createBatch();
-
-        // Index the new connection in app|source|type context
-        IndexScope indexScope = new IndexScopeImpl(
-                sourceEntity.getId(),
-                CpNamingUtils.getConnectionScopeName( targetEntity.getId().getType(), connType ));
-        batch.index(indexScope, targetEntity);
-        
-        // Index the new connection in app|scope|all-types context
-        IndexScope allTypesIndexScope = new IndexScopeImpl(
-                sourceEntity.getId(),
-                CpNamingUtils.ALL_TYPES);
-
-        batch.index(allTypesIndexScope, targetEntity);
-
-        batch.execute();
-    }
 
 
     void indexEntityIntoCollection(

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/163fa9ad/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
index 2abe83f..8ab0b7f 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
@@ -101,7 +101,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
     public static final  UUID DEFAULT_APPLICATION_ID =
             UUID.fromString("b6768a08-b5d5-11e3-a495-11ddb1de66c9");
 
-    private static AtomicBoolean INIT_SYSTEM = new AtomicBoolean(  );
+    private AtomicBoolean init_indexes = new AtomicBoolean(  );
 
 
     // cache of already instantiated entity managers
@@ -201,6 +201,8 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
     private EntityManager _getEntityManager( UUID applicationId ) {
         EntityManager em = new CpEntityManager();
         em.init( this, applicationId );
+        //TODO T.N. Can we remove this?  Seems like we should fix our lifecycle instead...
+        em.createIndex();
         return em;
     }
 
@@ -628,7 +630,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
 
     private void maybeCreateIndexes() {
         // system app
-        if ( INIT_SYSTEM.getAndSet( true ) ) {
+        if ( init_indexes.getAndSet( true ) ) {
             return;
         }
 
@@ -641,16 +643,16 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
     private List<EntityIndex> getManagementIndexes() {
 
         return Arrays.asList(
-                getManagerCache().getEntityIndex( new ApplicationScopeImpl(
-                        new SimpleId( SYSTEM_APP_ID, "application" ) ) ),
+                getManagerCache().getEntityIndex(
+                        new ApplicationScopeImpl( new SimpleId( SYSTEM_APP_ID, "application" ) ) ),
 
                 // default app
-               getManagerCache().getEntityIndex( new ApplicationScopeImpl(
-                        new SimpleId( getManagementAppId(), "application" ) ) ),
+               getManagerCache().getEntityIndex(
+                       new ApplicationScopeImpl( new SimpleId( getManagementAppId(), "application" ) ) ),
 
                 // management app
-               getManagerCache().getEntityIndex( new ApplicationScopeImpl(
-                        new SimpleId( getDefaultAppId(), "application" ) ) ) );
+               getManagerCache().getEntityIndex(
+                       new ApplicationScopeImpl( new SimpleId( getDefaultAppId(), "application" ) ) ) );
     }
 
 
@@ -682,8 +684,8 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
         EntityManager em = getEntityManager( appId );
         Application app = em.getApplication();
 
-        ((CpEntityManager)em).reindex( po );
-        em.refreshIndex();
+        em.reindex( po );
+//        em.refreshIndex();
 
         logger.info("\n\nRebuilt index for application {} id {}\n", app.getName(), appId );
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/163fa9ad/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpVisitor.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpVisitor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpVisitor.java
index 5d32235..aa06744 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpVisitor.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpVisitor.java
@@ -24,9 +24,12 @@ import org.apache.usergrid.persistence.EntityManager;
  */
 public interface CpVisitor {
 
+    /**
+     * Visit the entity as we're walking the structure
+     * @param em
+     * @param collName
+     * @param visitedEntity
+     */
     public void visitCollectionEntry( 
-        EntityManager em, String collName, Entity sourceEntity, Entity targetEntity );
-
-    public void visitConnectionEntry( 
-        EntityManager em, String connType, Entity sourceEntity, Entity targetEntity );
+        EntityManager em, String collName, Entity visitedEntity );
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/163fa9ad/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java
index b507edd..d491d3d 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java
@@ -15,12 +15,15 @@
  */
 package org.apache.usergrid.corepersistence;
 
+
 import java.util.Stack;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.usergrid.corepersistence.util.CpNamingUtils;
 import org.apache.usergrid.persistence.Entity;
 import org.apache.usergrid.persistence.EntityRef;
-import static org.apache.usergrid.persistence.Schema.TYPE_APPLICATION;
 import org.apache.usergrid.persistence.SimpleEntityRef;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.Edge;
@@ -30,10 +33,12 @@ import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.entity.SimpleId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
 import rx.Observable;
 import rx.functions.Action1;
+import rx.functions.Func1;
+
+import static org.apache.usergrid.persistence.Schema.TYPE_APPLICATION;
 
 
 /**
@@ -43,125 +48,83 @@ public class CpWalker {
 
     private static final Logger logger = LoggerFactory.getLogger( CpWalker.class );
 
-    private long writeDelayMs = 100;
+    private final long writeDelayMs;
+
 
+    /**
+     * Wait the set amount of time between successive writes.
+     * @param writeDelayMs
+     */
+    public CpWalker(final long writeDelayMs){
+         this.writeDelayMs = writeDelayMs;
+    }
 
-    public void walkCollections( final CpEntityManager em, final EntityRef start, 
-            final CpVisitor visitor ) throws Exception {
 
-        Stack stack = new Stack();
-        Id appId = new SimpleId( em.getApplicationId(), TYPE_APPLICATION );
-        stack.push( appId );
+    public void walkCollections( final CpEntityManager em, final EntityRef start, final CpVisitor visitor )
+            throws Exception {
 
-        doWalkCollections(em, new SimpleId(start.getUuid(), start.getType()), visitor, new Stack());
+        doWalkCollections( em, new SimpleId( start.getUuid(), start.getType() ), visitor );
     }
 
 
-    private void doWalkCollections( final CpEntityManager em, final Id start, 
-            final CpVisitor visitor, final Stack stack ) {
+    private void doWalkCollections( final CpEntityManager em, final Id start, final CpVisitor visitor ) {
 
         final Id fromEntityId = new SimpleId( start.getUuid(), start.getType() );
 
         final ApplicationScope applicationScope = em.getApplicationScope();
 
-        final GraphManager gm = em.getManagerCache().getGraphManager(applicationScope);
-
-        logger.debug("Loading edges types from {}:{}\n   scope {}:{}",
-            new Object[] { start.getType(), start.getUuid(),
-                applicationScope.getApplication().getType(),
-                applicationScope.getApplication().getUuid()
-            });
+        final GraphManager gm = em.getManagerCache().getGraphManager( applicationScope );
 
-        Observable<String> edgeTypes = gm.getEdgeTypesFromSource( 
-                new SimpleSearchEdgeType( fromEntityId, null , null ));
+        logger.debug( "Loading edges types from {}:{}\n   scope {}:{}", new Object[] {
+                        start.getType(), start.getUuid(), applicationScope.getApplication().getType(),
+                        applicationScope.getApplication().getUuid()
+                } );
 
-        edgeTypes.forEach( new Action1<String>() {
+        Observable<String> edgeTypes = gm.getEdgeTypesFromSource(
+                new SimpleSearchEdgeType( fromEntityId, CpNamingUtils.EDGE_COLL_SUFFIX, null ) );
 
+        edgeTypes.flatMap( new Func1<String, Observable<Edge>>() {
             @Override
-            public void call( final String edgeType ) {
-
-                try {
-                    Thread.sleep( writeDelayMs );
-                } catch ( InterruptedException ignored ) {}
+            public Observable<Edge> call( final String edgeType ) {
 
-                logger.debug("Loading edges of edgeType {} from {}:{}\n   scope {}:{}",
-                    new Object[] { edgeType, start.getType(), start.getUuid(),
-                        applicationScope.getApplication().getType(),
+                logger.debug( "Loading edges of edgeType {} from {}:{}\n   scope {}:{}", new Object[] {
+                        edgeType, start.getType(), start.getUuid(), applicationScope.getApplication().getType(),
                         applicationScope.getApplication().getUuid()
-                });
+                } );
 
+                return gm.loadEdgesFromSource( new SimpleSearchByEdgeType( fromEntityId, edgeType, Long.MAX_VALUE,
+                                SearchByEdgeType.Order.DESCENDING, null ) );
+            }
+        } ).doOnNext( new Action1<Edge>() {
 
-                Observable<String> edgeTypes = gm.getEdgeTypesFromSource( new SimpleSearchEdgeType(fromEntityId, CpNamingUtils.EDGE_COLL_SUFFIX, null ));
-
-
-
-                Observable<Edge> edges = gm.loadEdgesFromSource( new SimpleSearchByEdgeType( 
-                        fromEntityId, edgeType, Long.MAX_VALUE, 
-                        SearchByEdgeType.Order.DESCENDING, null ));
-
-                edges.forEach( new Action1<Edge>() {
-
-                    @Override
-                    public void call( Edge edge ) {
-
-                        EntityRef sourceEntityRef = new SimpleEntityRef( 
-                            edge.getSourceNode().getType(), edge.getSourceNode().getUuid());
-                        Entity sourceEntity;
-                        try {
-                            sourceEntity = em.get( sourceEntityRef );
-                        } catch (Exception ex) {
-                            logger.error( "Error getting sourceEntity {}:{}, continuing", 
-                                    sourceEntityRef.getType(), sourceEntityRef.getUuid());
-                            return;
-                        }
-
-                        EntityRef targetEntityRef = new SimpleEntityRef( 
-                            edge.getTargetNode().getType(), edge.getTargetNode().getUuid());
-                        Entity targetEntity;
-                        try {
-                            targetEntity = em.get( targetEntityRef );
-                        } catch (Exception ex) {
-                            logger.error( "Error getting sourceEntity {}:{}, continuing", 
-                                    sourceEntityRef.getType(), sourceEntityRef.getUuid());
-                            return;
-                        }
-                            
-                        if ( CpNamingUtils.isCollectionEdgeType( edge.getType() )) {
-
-                            String collName = CpNamingUtils.getCollectionName( edgeType );
-
-                            visitor.visitCollectionEntry( 
-                                    em, collName, sourceEntity, targetEntity );
-
-                            // recursion
-                            if ( !stack.contains( targetEntity.getUuid() )) {
-                                stack.push( targetEntity.getUuid() );
-                                doWalkCollections( em, edge.getSourceNode(), visitor, stack );
-                                stack.pop(); 
-                            }
+            @Override
+            public void call( Edge edge ) {
 
-                        } else {
+                EntityRef sourceEntityRef =
+                        new SimpleEntityRef( edge.getSourceNode().getType(), edge.getSourceNode().getUuid() );
 
-                            String collName = CpNamingUtils.getConnectionType(edgeType);
+                Entity entity;
+                try {
+                    entity = em.get( sourceEntityRef );
+                }
+                catch ( Exception ex ) {
+                    logger.error( "Error getting sourceEntity {}:{}, continuing", sourceEntityRef.getType(),
+                            sourceEntityRef.getUuid() );
+                    return;
+                }
 
-                            visitor.visitConnectionEntry( 
-                                    em, collName, sourceEntity, targetEntity );
 
-                            // recursion
-                            if ( !stack.contains( targetEntity.getUuid() )) {
-                                stack.push( targetEntity.getUuid() );
-                                doWalkCollections( em, edge.getTargetNode(), visitor, stack );
-                                stack.pop(); 
-                            }
-                        }
-                    }
+                String collName = CpNamingUtils.getCollectionName( edge.getType() );
 
-                }); // end foreach on edges
+                visitor.visitCollectionEntry( em, collName, entity );
 
+                try {
+                    Thread.sleep( writeDelayMs );
+                }
+                catch ( InterruptedException e ) {
+                    throw new RuntimeException( "Unable to wait" );
+                }
             }
-
-        }); // end foreach on edgeTypes
-
+        } ).toBlocking().lastOrDefault( null ); // end foreach on edges
     }
-    
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/163fa9ad/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManagerFactory.java
index 06c3114..e57aa69 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManagerFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManagerFactory.java
@@ -131,6 +131,12 @@ public interface EntityManagerFactory {
     public void rebuildCollectionIndex(UUID appId, String collection, ProgressObserver object);
 
     public interface ProgressObserver {
-        public void onProgress( EntityRef source, EntityRef target, String edgeType );
+        public void onProgress( EntityRef entity);
+
+        /**
+         * Get the write delay time from the progress observer.  Used to throttle writes
+         * @return
+         */
+        public long getWriteDelayTime();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/163fa9ad/stack/core/src/test/java/org/apache/usergrid/CoreApplication.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/CoreApplication.java b/stack/core/src/test/java/org/apache/usergrid/CoreApplication.java
index c790f64..51e825b 100644
--- a/stack/core/src/test/java/org/apache/usergrid/CoreApplication.java
+++ b/stack/core/src/test/java/org/apache/usergrid/CoreApplication.java
@@ -150,6 +150,7 @@ public class CoreApplication implements Application, TestRule {
         id = setup.createApplication( orgName, appName );
         assertNotNull( id );
 
+        setup.getEmf().refreshIndex();
         em = setup.getEmf().getEntityManager( id );
         em.createIndex();
         assertNotNull( em );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/163fa9ad/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java b/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
index 8de520a..d2f7fef 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
@@ -60,14 +60,13 @@ import static org.junit.Assert.fail;
 //@RunWith(JukitoRunner.class)
 //@UseModules({ GuiceModule.class })
 @Concurrent()
-@Ignore("Temporary ignore")
 public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT {
     private static final Logger logger = LoggerFactory.getLogger(PerformanceEntityRebuildIndexTest.class );
 
     private static final MetricRegistry registry = new MetricRegistry();
     private Slf4jReporter reporter;
 
-    private static final long RUNTIME = TimeUnit.MINUTES.toMillis( 1 );
+    private static final long RUNTIME = TimeUnit.SECONDS.toMillis( 5 );
 
     private static final long writeDelayMs = 100;
     //private static final long readDelayMs = 7;
@@ -192,18 +191,26 @@ public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT {
         EntityManagerFactory.ProgressObserver po = new EntityManagerFactory.ProgressObserver() {
             int counter = 0;
             @Override
-            public void onProgress( EntityRef s, EntityRef t, String etype ) {
+               public void onProgress( final EntityRef entity ) {
+
                 meter.mark();
-                logger.debug("Indexing from {}:{} to {}:{} edgeType {}", new Object[] {
-                    s.getType(), s.getUuid(), t.getType(), t.getUuid(), etype });
+                logger.debug("Indexing from {}:{}", entity.getType(), entity.getUuid());
                 if ( !logger.isDebugEnabled() && counter % 100 == 0 ) {
                     logger.info("Reindexed {} entities", counter );
                 }
                 counter++;
             }
+
+
+
+            @Override
+            public long getWriteDelayTime() {
+                return 0;
+            }
         };
 
         try {
+//            setup.getEmf().refreshIndex();
             setup.getEmf().rebuildAllIndexes( po );
 
             registry.remove( meterName );
@@ -236,9 +243,6 @@ public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT {
         eeii.deleteIndex();
     }
 
-    private int readData( String collectionName ) throws Exception {
-        return readData( collectionName, -1 );
-    }
 
     private int readData( String collectionName, int expected ) throws Exception {
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/163fa9ad/stack/rest/src/main/java/org/apache/usergrid/rest/SystemResource.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/SystemResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/SystemResource.java
index ceec656..28a7120 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/SystemResource.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/SystemResource.java
@@ -124,11 +124,17 @@ public class SystemResource extends AbstractContextResource {
 
 
         final ProgressObserver po = new ProgressObserver() {
+
+
+            @Override
+            public void onProgress( final EntityRef entity ) {
+                logger.info( "Indexing from {}:{} ", entity.getType(), entity.getUuid() );
+            }
+
+
             @Override
-            public void onProgress( EntityRef s, EntityRef t, String etype ) {
-                logger.info( "Indexing from {}:{} to {}:{} edgeType {}", new Object[] {
-                        s.getType(), s.getUuid(), t.getType(), t.getUuid(), etype
-                } );
+            public long getWriteDelayTime() {
+                return 0;
             }
         };
 
@@ -168,7 +174,8 @@ public class SystemResource extends AbstractContextResource {
     public JSONWithPadding rebuildIndexes( 
                 @Context UriInfo ui, 
                 @PathParam( "applicationId" ) String applicationIdStr,
-                @QueryParam( "callback" ) @DefaultValue( "callback" ) String callback )
+                @QueryParam( "callback" ) @DefaultValue( "callback" ) String callback,
+                @QueryParam( "delay" ) @DefaultValue( "10" ) final long delay)
 
             throws Exception {
 
@@ -177,14 +184,6 @@ public class SystemResource extends AbstractContextResource {
         response.setAction( "rebuild indexes" );
 
 
-        final ProgressObserver po = new ProgressObserver() {
-            @Override
-            public void onProgress( EntityRef s, EntityRef t, String etype ) {
-                logger.info( "Indexing from {}:{} to {}:{} edgeType {}", new Object[] {
-                        s.getType(), s.getUuid(), t.getType(), t.getUuid(), etype
-                } );
-            }
-        };
 
 
         final EntityManager em = emf.getEntityManager( appId );
@@ -199,7 +198,7 @@ public class SystemResource extends AbstractContextResource {
 
 
                 {
-                    rebuildCollection( appId, collectionName );
+                    rebuildCollection( appId, collectionName, delay );
                 }
             }
         };
@@ -221,7 +220,8 @@ public class SystemResource extends AbstractContextResource {
                 @Context UriInfo ui,
                 @PathParam( "applicationId" ) final String applicationIdStr,
                 @PathParam( "collectionName" ) final String collectionName,
-                @QueryParam( "callback" ) @DefaultValue( "callback" ) String callback )
+                @QueryParam( "callback" ) @DefaultValue( "callback" ) String callback,
+                @QueryParam( "delay" ) @DefaultValue( "10" ) final long delay )
             throws Exception {
 
         final UUID appId = UUIDUtils.tryExtractUUID( applicationIdStr );
@@ -232,7 +232,7 @@ public class SystemResource extends AbstractContextResource {
 
             public void run() {
 
-                rebuildCollection( appId, collectionName );
+                rebuildCollection( appId, collectionName, delay );
             }
         };
 
@@ -247,13 +247,18 @@ public class SystemResource extends AbstractContextResource {
     }
 
 
-    private void rebuildCollection( final UUID applicationId, final String collectionName ) {
+    private void rebuildCollection( final UUID applicationId, final String collectionName, final long delay ) {
         EntityManagerFactory.ProgressObserver po = new EntityManagerFactory.ProgressObserver() {
+
+            @Override
+            public void onProgress( final EntityRef entity ) {
+                logger.info( "Indexing from {}:{} to {}:{} edgeType {}", entity.getType(), entity.getUuid());
+            }
+
+
             @Override
-            public void onProgress( EntityRef s, EntityRef t, String etype ) {
-                logger.info( "Indexing from {}:{} to {}:{} edgeType {}", new Object[] {
-                        s.getType(), s.getUuid(), t.getType(), t.getUuid(), etype
-                } );
+            public long getWriteDelayTime() {
+                return delay;
             }
         };
 


[03/13] git commit: Adds explicit refresh after create to ensure the index is actually ready to receive data.

Posted by to...@apache.org.
Adds explicit refresh after create to ensure the index is actually ready to receive data.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/54b9a119
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/54b9a119
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/54b9a119

Branch: refs/heads/cloudformation-update
Commit: 54b9a1197edc6498629f3bdb78f95408c24c566d
Parents: f24f3ea
Author: Todd Nine <to...@apache.org>
Authored: Thu Oct 16 15:11:45 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Thu Oct 16 15:11:45 2014 -0600

----------------------------------------------------------------------
 .../index/impl/EsEntityIndexImpl.java           | 23 ++++++++++++++++++++
 1 file changed, 23 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/54b9a119/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index e55f7e0..495b9e1 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@ -87,6 +87,9 @@ public class EsEntityIndexImpl implements EntityIndex {
 
     private final IndexFig config;
 
+    private static final int MAX_WAITS = 10;
+    private static final int WAIT_TIME = 250;
+
 
     @Inject
     public EsEntityIndexImpl( @Assisted final ApplicationScope appScope, final IndexFig config,
@@ -120,6 +123,26 @@ public class EsEntityIndexImpl implements EntityIndex {
                 response = admin.indices().prepareRefresh( indexName ).execute().actionGet();
             }
             while ( response.getFailedShards() != 0 );
+
+            //now try to refresh, to ensure that it's recognized by everyone.  Occasionally we can get a success
+            //before we can write.
+            for(int i = 0 ; i < MAX_WAITS; i++ ){
+                try{
+                    refresh();
+                    break;
+
+                }catch(Exception e){
+                   log.error( "Unable to refresh index after create. Waiting before sleeping.", e );
+                }
+
+                try {
+                    Thread.sleep( WAIT_TIME );
+                }
+                catch ( InterruptedException e ) {
+                    //swallow it
+                }
+            }
+
             //
             //            response.getFailedShards();
             //


[10/13] git commit: Added verification to init to catch NPE issues

Posted by to...@apache.org.
Added verification to init to catch NPE issues

Updated index creation to write a document and delete to verify correct functionality


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/722c43b7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/722c43b7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/722c43b7

Branch: refs/heads/cloudformation-update
Commit: 722c43b731a80c810916a09af8e50c19ac7c6891
Parents: 83200bf
Author: Todd Nine <to...@apache.org>
Authored: Thu Oct 16 20:53:59 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Thu Oct 16 20:53:59 2014 -0600

----------------------------------------------------------------------
 .../corepersistence/CpEntityManager.java        |  20 +--
 .../usergrid/persistence/EntityManager.java     |   2 +-
 .../index/impl/EsEntityIndexImpl.java           | 144 ++++++++++++++-----
 3 files changed, 121 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/722c43b7/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index 2498dda..751b9e2 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -16,6 +16,7 @@
 package org.apache.usergrid.corepersistence;
 
 
+import com.google.common.base.Preconditions;
 import com.netflix.hystrix.exception.HystrixRuntimeException;
 import com.yammer.metrics.annotation.Metered;
 import static java.lang.String.CASE_INSENSITIVE_ORDER;
@@ -191,6 +192,9 @@ public class CpEntityManager implements EntityManager {
     @Override
     public void init( EntityManagerFactory emf, UUID applicationId ) {
 
+        Preconditions.checkNotNull(emf, "emf must not be null");
+        Preconditions.checkNotNull( applicationId, "applicationId must not be null"  );
+
         this.emf = ( CpEntityManagerFactory ) emf;
         this.managerCache = this.emf.getManagerCache();
         this.applicationId = applicationId;
@@ -202,9 +206,6 @@ public class CpEntityManager implements EntityManager {
 
         // set to false for now
         this.skipAggregateCounters = false;
-
-
-        applicationScope = this.emf.getApplicationScope( applicationId );
     }
 
 
@@ -643,11 +644,11 @@ public class CpEntityManager implements EntityManager {
         return getRelationManager( entityRef ).searchCollection( collectionName, query );
     }
 
-
-    @Override
-    public void setApplicationId( UUID applicationId ) {
-        this.applicationId = applicationId;
-    }
+//
+//    @Override
+//    public void setApplicationId( UUID applicationId ) {
+//        this.applicationId = applicationId;
+//    }
 
 
     @Override
@@ -2884,7 +2885,8 @@ public class CpEntityManager implements EntityManager {
                 }
                 catch(WriteOptimisticVerifyException wo ){
                     //swallow this, it just means this was already updated, which accomplishes our task.  Just ignore.
-                    logger.warn( "Someone beat us to updating entity {} in collection {}.  Ignoring.", entity.getName(), collName );
+                    logger.warn( "Someone beat us to updating entity {} in collection {}.  Ignoring.", entity.getName(),
+                            collName );
                 }
                 catch (Exception ex) {
                     logger.error("Error repersisting entity", ex);

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/722c43b7/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java b/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java
index 3684d7e..cd92729 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java
@@ -41,7 +41,7 @@ import org.apache.usergrid.persistence.index.query.Query.Level;
  */
 public interface EntityManager {
 
-    public void setApplicationId( UUID applicationId );
+//    public void setApplicationId( UUID applicationId );
 
     public GeoIndexManager getGeoIndexManager();
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/722c43b7/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index b562d8a..7584386 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@ -26,7 +26,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
 import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
-import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
 import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateResponse;
 import org.elasticsearch.action.search.SearchRequestBuilder;
 import org.elasticsearch.action.search.SearchResponse;
@@ -36,7 +35,9 @@ import org.elasticsearch.client.Client;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentFactory;
 import org.elasticsearch.index.query.FilterBuilder;
+import org.elasticsearch.index.query.MatchAllQueryBuilder;
 import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.indices.IndexAlreadyExistsException;
 import org.elasticsearch.indices.IndexMissingException;
 import org.elasticsearch.search.SearchHit;
@@ -58,7 +59,9 @@ import org.apache.usergrid.persistence.index.query.CandidateResults;
 import org.apache.usergrid.persistence.index.query.Query;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.entity.SimpleId;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 
+import com.google.common.collect.ImmutableMap;
 import com.google.inject.Inject;
 import com.google.inject.assistedinject.Assisted;
 
@@ -88,11 +91,18 @@ public class EsEntityIndexImpl implements EntityIndex {
 
     private final IndexFig config;
 
-    //number of times to wait for the index to refresh propertly. Is an N+1, so 9 = 10
-    private static final int MAX_WAITS = 9;
+    //number of times to wait for the index to refresh properly.
+    private static final int MAX_WAITS = 10;
     //number of milliseconds to try again before sleeping
     private static final int WAIT_TIME = 250;
 
+    private static final String VERIFY_TYPE = "verification";
+
+    private static final ImmutableMap<String, Object> DEFAULT_PAYLOAD =
+            ImmutableMap.<String, Object>of( "field", "test" );
+
+    private static final MatchAllQueryBuilder MATCH_ALL_QUERY_BUILDER = QueryBuilders.matchAllQuery();
+
 
     @Inject
     public EsEntityIndexImpl( @Assisted final ApplicationScope appScope, final IndexFig config,
@@ -120,21 +130,16 @@ public class EsEntityIndexImpl implements EntityIndex {
             CreateIndexResponse cir = admin.indices().prepareCreate( indexName ).execute().actionGet();
             log.info( "Created new Index Name [{}] ACK=[{}]", indexName, cir.isAcknowledged() );
 
-            RefreshResponse response;
-
-            do {
-                response = admin.indices().prepareRefresh( indexName ).execute().actionGet();
-            }
-            while ( response.getFailedShards() != 0 );
 
+            //create the document, this ensures the index is ready
             /**
-             * Immediately refresh to ensure the entire cluster is ready to receive this write.  Occasionally we see
+             * Immediately create a document and remove it to ensure the entire cluster is ready to receive documents
+             * .  Occasionally we see
              * errors.  See this post.
-             * http://elasticsearch-users.115913.n3.nabble
-             * .com/IndexMissingException-on-create-index-followed-by-refresh-td1832793.html
+             * http://elasticsearch-users.115913.n3.nabble.com/IndexMissingException-on-create-index-followed-by-refresh-td1832793.html
              *
              */
-            refresh();
+            testNewIndex();
         }
         catch ( IndexAlreadyExistsException expected ) {
             // this is expected to happen if index already exists, it's a no-op and swallow
@@ -146,6 +151,44 @@ public class EsEntityIndexImpl implements EntityIndex {
 
 
     /**
+     * Tests writing a document to a new index to ensure it's working correctly.  Comes from email
+     *
+     * http://elasticsearch-users.115913.n3.nabble
+     * .com/IndexMissingException-on-create-index-followed-by-refresh-td1832793.html
+     */
+
+    private void testNewIndex() {
+
+
+        log.info( "Refreshing Created new Index Name [{}]", indexName );
+
+        final RetryOperation retryOperation = new RetryOperation() {
+            @Override
+            public boolean doOp() {
+                final String tempId = UUIDGenerator.newTimeUUID().toString();
+
+
+                client.prepareIndex( indexName, VERIFY_TYPE, tempId ).setSource( DEFAULT_PAYLOAD ).get();
+
+                log.info( "Successfully created new document with docId {} in index {} and type {}", tempId, indexName,
+                        VERIFY_TYPE );
+
+                //delete all types, this way if we miss one it will get cleaned up
+
+                client.prepareDeleteByQuery( indexName ).setTypes( VERIFY_TYPE ).setQuery( MATCH_ALL_QUERY_BUILDER )
+                      .get();
+
+                log.info( "Successfully deleted all documents in index {} and type {}", indexName, VERIFY_TYPE );
+
+                return true;
+            }
+        };
+
+        doInRetry( retryOperation );
+    }
+
+
+    /**
      * Setup ElasticSearch type mappings as a template that applies to all new indexes. Applies to all indexes that
      * start with our prefix.
      */
@@ -277,30 +320,22 @@ public class EsEntityIndexImpl implements EntityIndex {
 
         log.info( "Refreshing Created new Index Name [{}]", indexName );
 
-        //now try to refresh, to ensure that it's recognized by everyone.  Occasionally we can get a success
-        //before we can write.
-        for ( int i = 0; i < MAX_WAITS; i++ ) {
-            try {
-                client.admin().indices().prepareRefresh( indexName ).execute().actionGet();
-                log.debug( "Refreshed index: " + indexName );
-                return;
-            }
-            catch ( IndexMissingException e ) {
-                log.error( "Unable to refresh index after create. Waiting before sleeping.", e );
-            }
-
-            try {
-                Thread.sleep( WAIT_TIME );
-            }
-            catch ( InterruptedException e ) {
-                //swallow it
+        final RetryOperation retryOperation = new RetryOperation() {
+            @Override
+            public boolean doOp() {
+                try {
+                    client.admin().indices().prepareRefresh( indexName ).execute().actionGet();
+                    log.debug( "Refreshed index: " + indexName );
+                    return true;
+                }
+                catch ( IndexMissingException e ) {
+                    log.error( "Unable to refresh index after create. Waiting before sleeping.", e );
+                    throw e;
+                }
             }
-        }
+        };
 
-        /**
-         * Try the refresh one last time if we get here
-         */
-        client.admin().indices().prepareRefresh( indexName ).execute().actionGet();
+        doInRetry( retryOperation );
 
         log.debug( "Refreshed index: " + indexName );
     }
@@ -328,4 +363,43 @@ public class EsEntityIndexImpl implements EntityIndex {
             log.info( "Failed to delete index " + indexName );
         }
     }
+
+
+    /**
+     * Do the retry operation
+     * @param operation
+     */
+    private void doInRetry( final RetryOperation operation ) {
+        for ( int i = 0; i < MAX_WAITS; i++ ) {
+
+            try {
+                if(operation.doOp()){
+                    return;
+                }
+            }
+            catch ( Exception e ) {
+                log.error( "Unable to execute operation, retrying", e );
+            }
+
+
+            try {
+                Thread.sleep( WAIT_TIME );
+            }
+            catch ( InterruptedException e ) {
+                //swallow it
+            }
+        }
+    }
+
+
+    /**
+     * Interface for operations
+     */
+    private static interface RetryOperation {
+
+        /**
+         * Return true if done, false if there should be a retry
+         */
+        public boolean doOp();
+    }
 }


[12/13] git commit: Fixes migration bug

Posted by to...@apache.org.
Fixes migration bug


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/8d9d4dcf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/8d9d4dcf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/8d9d4dcf

Branch: refs/heads/cloudformation-update
Commit: 8d9d4dcf043240a9c53902b27e3daa23cd9987de
Parents: bd00ee0
Author: Todd Nine <to...@apache.org>
Authored: Thu Oct 16 23:24:50 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Thu Oct 16 23:24:50 2014 -0600

----------------------------------------------------------------------
 .../corepersistence/CpEntityManagerFactory.java |  8 +++++++
 .../usergrid/corepersistence/CpWalker.java      | 15 ++++++-------
 .../PerformanceEntityRebuildIndexTest.java      |  2 +-
 .../persistence/index/impl/EsProvider.java      | 22 +++++++++++++-------
 .../apache/usergrid/rest/SystemResource.java    |  7 ++-----
 5 files changed, 33 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8d9d4dcf/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
index e013957..bc45769 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
@@ -95,9 +95,15 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
     public static final UUID SYSTEM_APP_ID =
             UUID.fromString("b6768a08-b5d5-11e3-a495-10ddb1de66c3");
 
+    /**
+     * App where we store management info
+     */
     public static final  UUID MANAGEMENT_APPLICATION_ID =
             UUID.fromString("b6768a08-b5d5-11e3-a495-11ddb1de66c8");
 
+    /**
+     * TODO Dave what is this?
+     */
     public static final  UUID DEFAULT_APPLICATION_ID =
             UUID.fromString("b6768a08-b5d5-11e3-a495-11ddb1de66c9");
 
@@ -679,6 +685,8 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
     @Override
     public void rebuildInternalIndexes( ProgressObserver po ) throws Exception {
         rebuildApplicationIndexes(SYSTEM_APP_ID, po);
+        rebuildApplicationIndexes( MANAGEMENT_APPLICATION_ID, po );
+        rebuildApplicationIndexes( DEFAULT_APPLICATION_ID, po );
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8d9d4dcf/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java
index 908e6bc..636cc91 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java
@@ -67,33 +67,32 @@ public class CpWalker {
     }
 
 
-    private void doWalkCollections( final CpEntityManager em, final Id start, final CpVisitor visitor ) {
-
-        final Id fromEntityId = new SimpleId( start.getUuid(), start.getType() );
+    private void doWalkCollections( final CpEntityManager em, final Id applicationId, final CpVisitor visitor ) {
 
         final ApplicationScope applicationScope = em.getApplicationScope();
 
         final GraphManager gm = em.getManagerCache().getGraphManager( applicationScope );
 
         logger.debug( "Loading edges types from {}:{}\n   scope {}:{}", new Object[] {
-                        start.getType(), start.getUuid(), applicationScope.getApplication().getType(),
+                applicationId.getType(), applicationId.getUuid(), applicationScope.getApplication().getType(),
                         applicationScope.getApplication().getUuid()
                 } );
 
         //only search edge types that start with collections
+
         Observable<String> edgeTypes = gm.getEdgeTypesFromSource(
-                new SimpleSearchEdgeType( fromEntityId, CpNamingUtils.EDGE_COLL_SUFFIX, null ) );
+                       new SimpleSearchEdgeType( applicationId, CpNamingUtils.EDGE_COLL_SUFFIX+"users", null ) );
 
         edgeTypes.flatMap( new Func1<String, Observable<Edge>>() {
             @Override
             public Observable<Edge> call( final String edgeType ) {
 
                 logger.debug( "Loading edges of edgeType {} from {}:{}\n   scope {}:{}", new Object[] {
-                        edgeType, start.getType(), start.getUuid(), applicationScope.getApplication().getType(),
+                        edgeType, applicationId.getType(), applicationId.getUuid(), applicationScope.getApplication().getType(),
                         applicationScope.getApplication().getUuid()
                 } );
 
-                return gm.loadEdgesFromSource( new SimpleSearchByEdgeType( fromEntityId, edgeType, Long.MAX_VALUE,
+                return gm.loadEdgesFromSource( new SimpleSearchByEdgeType( applicationId, edgeType, Long.MAX_VALUE,
                                 SearchByEdgeType.Order.DESCENDING, null ) );
             }
         } ).doOnNext( new Action1<Edge>() {
@@ -101,6 +100,8 @@ public class CpWalker {
             @Override
             public void call( Edge edge ) {
 
+                logger.info( "Re-indexing edge {}", edge );
+
                 EntityRef targetNodeEntityRef =
                         new SimpleEntityRef( edge.getTargetNode().getType(), edge.getTargetNode().getUuid() );
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8d9d4dcf/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java b/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
index 2ef65ef..ed3fd61 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
@@ -194,7 +194,7 @@ public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT {
                public void onProgress( final EntityRef entity ) {
 
                 meter.mark();
-                logger.debug("Indexing from {}:{}", entity.getType(), entity.getUuid());
+                logger.debug("Indexing {}:{}", entity.getType(), entity.getUuid());
                 if ( !logger.isDebugEnabled() && counter % 100 == 0 ) {
                     logger.info("Reindexed {} entities", counter );
                 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8d9d4dcf/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsProvider.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsProvider.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsProvider.java
index aac9ff5..9b37952 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsProvider.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsProvider.java
@@ -31,8 +31,10 @@ import org.apache.commons.lang.RandomStringUtils;
 import org.apache.usergrid.persistence.core.util.AvailablePortFinder;
 import org.apache.usergrid.persistence.index.IndexFig;
 import org.elasticsearch.client.Client;
+import org.elasticsearch.client.transport.TransportClient;
 import org.elasticsearch.common.settings.ImmutableSettings;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
 import org.elasticsearch.node.Node;
 import org.elasticsearch.node.NodeBuilder;
 import org.slf4j.Logger;
@@ -152,24 +154,28 @@ public class EsProvider {
 
                 Settings settings = ImmutableSettings.settingsBuilder()
 
-                    .put("cluster.name", fig.getClusterName())
+                    .put( "cluster.name", fig.getClusterName() )
 
                     // this assumes that we're using zen for host discovery.  Putting an 
                     // explicit set of bootstrap hosts ensures we connect to a valid cluster.
-                    .put("discovery.zen.ping.unicast.hosts", allHosts)
-                    .put("discovery.zen.ping.multicast.enabled","false")
+                    .put( "discovery.zen.ping.unicast.hosts", allHosts )
+                    .put( "discovery.zen.ping.multicast.enabled", "false" )
                     .put("http.enabled", false) 
 
-                    .put("client.transport.ping_timeout", 2000) // milliseconds
-                    .put("client.transport.nodes_sampler_interval", 100)
-                    .put("network.tcp.blocking", true)
-                    .put("node.client", true)
-                    .put("node.name",  nodeName )
+                    .put( "client.transport.ping_timeout", 2000 ) // milliseconds
+                    .put( "client.transport.nodes_sampler_interval", 100 )
+                    .put( "network.tcp.blocking", true )
+                    .put( "node.client", true )
+                    .put( "node.name", nodeName )
 
                     .build();
 
                 log.debug("Creating ElasticSearch client with settings: " +  settings.getAsMap());
 
+                //use this client when connecting via socket only, such as ssh tunnel or other firewall issues
+//                newClient  = new TransportClient(settings).addTransportAddress( new InetSocketTransportAddress("localhost", 9300) );
+
+                //use this client for quick connectivity
                 Node node = NodeBuilder.nodeBuilder().settings(settings)
                     .client(true).node();
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8d9d4dcf/stack/rest/src/main/java/org/apache/usergrid/rest/SystemResource.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/SystemResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/SystemResource.java
index 28a7120..d068846 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/SystemResource.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/SystemResource.java
@@ -128,7 +128,7 @@ public class SystemResource extends AbstractContextResource {
 
             @Override
             public void onProgress( final EntityRef entity ) {
-                logger.info( "Indexing from {}:{} ", entity.getType(), entity.getUuid() );
+                logger.info( "Indexing entity {}:{} ", entity.getType(), entity.getUuid() );
             }
 
 
@@ -146,9 +146,6 @@ public class SystemResource extends AbstractContextResource {
                 logger.info( "Rebuilding all indexes" );
 
                 try {
-                    emf.rebuildInternalIndexes( po );
-                    emf.refreshIndex();
-
                     emf.rebuildAllIndexes( po );
                 }
                 catch ( Exception e ) {
@@ -252,7 +249,7 @@ public class SystemResource extends AbstractContextResource {
 
             @Override
             public void onProgress( final EntityRef entity ) {
-                logger.info( "Indexing from {}:{} to {}:{} edgeType {}", entity.getType(), entity.getUuid());
+                logger.info( "Indexing entity {}:{}", entity.getType(), entity.getUuid());
             }
 
 


[06/13] Fixed index refresh issue.

Posted by to...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c31c553f/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java
index d491d3d..908e6bc 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java
@@ -80,6 +80,7 @@ public class CpWalker {
                         applicationScope.getApplication().getUuid()
                 } );
 
+        //only search edge types that start with collections
         Observable<String> edgeTypes = gm.getEdgeTypesFromSource(
                 new SimpleSearchEdgeType( fromEntityId, CpNamingUtils.EDGE_COLL_SUFFIX, null ) );
 
@@ -100,16 +101,16 @@ public class CpWalker {
             @Override
             public void call( Edge edge ) {
 
-                EntityRef sourceEntityRef =
-                        new SimpleEntityRef( edge.getSourceNode().getType(), edge.getSourceNode().getUuid() );
+                EntityRef targetNodeEntityRef =
+                        new SimpleEntityRef( edge.getTargetNode().getType(), edge.getTargetNode().getUuid() );
 
                 Entity entity;
                 try {
-                    entity = em.get( sourceEntityRef );
+                    entity = em.get( targetNodeEntityRef );
                 }
                 catch ( Exception ex ) {
-                    logger.error( "Error getting sourceEntity {}:{}, continuing", sourceEntityRef.getType(),
-                            sourceEntityRef.getUuid() );
+                    logger.error( "Error getting sourceEntity {}:{}, continuing", targetNodeEntityRef.getType(),
+                            targetNodeEntityRef.getUuid() );
                     return;
                 }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c31c553f/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java b/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
index d2f7fef..2ef65ef 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
@@ -213,6 +213,7 @@ public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT {
 //            setup.getEmf().refreshIndex();
             setup.getEmf().rebuildAllIndexes( po );
 
+            reporter.report();
             registry.remove( meterName );
             logger.info("Rebuilt index");
 
@@ -236,7 +237,6 @@ public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT {
 
         Id appId = new SimpleId( appUuid, "application");
         ApplicationScope scope = new ApplicationScopeImpl( appId );
-        IndexScope is = new IndexScopeImpl( appId, "application");
         EntityIndex ei = eif.createEntityIndex(scope);
         EsEntityIndexImpl eeii = (EsEntityIndexImpl)ei;
 
@@ -247,6 +247,7 @@ public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT {
     private int readData( String collectionName, int expected ) throws Exception {
 
         EntityManager em = app.getEntityManager();
+        em.refreshIndex();
 
         Query q = Query.fromQL("select * where key1=1000");
         q.setLimit(40);

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c31c553f/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index 9ea14a1..821f7b9 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@ -116,7 +116,7 @@ public class EsEntityIndexImpl implements EntityIndex {
 
             AdminClient admin = client.admin();
             CreateIndexResponse cir = admin.indices().prepareCreate( indexName ).execute().actionGet();
-            log.debug( "Created new Index Name [{}] ACK=[{}]", indexName, cir.isAcknowledged() );
+            log.info( "Created new Index Name [{}] ACK=[{}]", indexName, cir.isAcknowledged() );
 
             RefreshResponse response;
 
@@ -128,11 +128,11 @@ public class EsEntityIndexImpl implements EntityIndex {
             /**
              * Immediately refresh to ensure the entire cluster is ready to receive this write.  Occasionally we see
              * errors.  See this post.
-             * http://elasticsearch-users.115913.n3.nabble.com/IndexMissingException-on-create-index-followed-by-refresh-td1832793.html
+             * http://elasticsearch-users.115913.n3.nabble
+             * .com/IndexMissingException-on-create-index-followed-by-refresh-td1832793.html
              *
              */
             refresh();
-
         }
         catch ( IndexAlreadyExistsException expected ) {
             // this is expected to happen if index already exists, it's a no-op and swallow
@@ -272,25 +272,33 @@ public class EsEntityIndexImpl implements EntityIndex {
 
     public void refresh() {
 
-            //now try to refresh, to ensure that it's recognized by everyone.  Occasionally we can get a success
-            //before we can write.
-            for(int i = 0 ; i < MAX_WAITS; i++ ){
-                try{
-                    client.admin().indices().prepareRefresh( indexName ).execute().actionGet();
-                    break;
 
-                }catch(IndexMissingException e){
-                   log.error( "Unable to refresh index after create. Waiting before sleeping.", e );
-                }
+        log.info( "Refreshing Created new Index Name [{}]", indexName );
 
-                try {
-                    Thread.sleep( WAIT_TIME );
-                }
-                catch ( InterruptedException e ) {
-                    //swallow it
-                }
+        //now try to refresh, to ensure that it's recognized by everyone.  Occasionally we can get a success
+        //before we can write.
+        for ( int i = 0; i < MAX_WAITS; i++ ) {
+            try {
+                client.admin().indices().prepareRefresh( indexName ).execute().actionGet();
+                return;
+            }
+            catch ( IndexMissingException e ) {
+                log.error( "Unable to refresh index after create. Waiting before sleeping.", e );
             }
 
+            try {
+                Thread.sleep( WAIT_TIME );
+            }
+            catch ( InterruptedException e ) {
+                //swallow it
+            }
+        }
+
+        /**
+         * Try the refresh one last time if we get here
+         */
+        client.admin().indices().prepareRefresh( indexName ).execute().actionGet();
+
         log.debug( "Refreshed index: " + indexName );
     }
 


[02/13] git commit: Moved create index down to cache init. Not ideal, but lowest least called point in the code until setup is refactored.

Posted by to...@apache.org.
Moved create index down to cache init.  Not ideal, but lowest least called point in the code until setup is refactored.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/f24f3ea7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/f24f3ea7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/f24f3ea7

Branch: refs/heads/cloudformation-update
Commit: f24f3ea7cc70efa18de83308d4eb9011548517e4
Parents: 163fa9a
Author: Todd Nine <to...@apache.org>
Authored: Thu Oct 16 13:40:32 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Thu Oct 16 13:40:32 2014 -0600

----------------------------------------------------------------------
 .../apache/usergrid/corepersistence/CpEntityManagerFactory.java   | 3 +--
 stack/core/src/test/java/org/apache/usergrid/CoreApplication.java | 2 --
 2 files changed, 1 insertion(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f24f3ea7/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
index 8ab0b7f..2514e20 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
@@ -146,7 +146,6 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
                 Map sysAppProps = new HashMap<String, Object>();
                 sysAppProps.put( PROPERTY_NAME, "systemapp");
                 em.create( SYSTEM_APP_ID, TYPE_APPLICATION, sysAppProps );
-                em.createIndex();
                 em.getApplication();
                 em.refreshIndex();
             }
@@ -287,7 +286,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
         EntityManager appEm = getEntityManager( applicationId );
 
         //create our ES index since we're initializing this application
-        appEm.createIndex();
+//  TODO T.N, pushed this down into the cache load      appEm.createIndex();
 
         appEm.create( applicationId, TYPE_APPLICATION, properties );
         appEm.resetRoles();

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f24f3ea7/stack/core/src/test/java/org/apache/usergrid/CoreApplication.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/CoreApplication.java b/stack/core/src/test/java/org/apache/usergrid/CoreApplication.java
index 51e825b..fd8ca9b 100644
--- a/stack/core/src/test/java/org/apache/usergrid/CoreApplication.java
+++ b/stack/core/src/test/java/org/apache/usergrid/CoreApplication.java
@@ -150,9 +150,7 @@ public class CoreApplication implements Application, TestRule {
         id = setup.createApplication( orgName, appName );
         assertNotNull( id );
 
-        setup.getEmf().refreshIndex();
         em = setup.getEmf().getEntityManager( id );
-        em.createIndex();
         assertNotNull( em );
 
         LOG.info( "Created new application {} in organization {}", appName, orgName );


[07/13] Fixed index refresh issue.

Posted by to...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c31c553f/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index 2a790ab..96f0e1e 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -16,14 +16,10 @@
 
 package org.apache.usergrid.corepersistence;
 
-import static me.prettyprint.hector.api.factory.HFactory.createMutator;
 
-import com.clearspring.analytics.hash.MurmurHash;
-import com.yammer.metrics.annotation.Metered;
 import java.nio.ByteBuffer;
 import java.util.AbstractMap;
 import java.util.ArrayList;
-import static java.util.Arrays.asList;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -33,21 +29,18 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.util.Assert;
+
 import org.apache.usergrid.corepersistence.results.ResultsLoader;
 import org.apache.usergrid.corepersistence.results.ResultsLoaderFactory;
 import org.apache.usergrid.corepersistence.results.ResultsLoaderFactoryImpl;
 import org.apache.usergrid.corepersistence.util.CpEntityMapUtils;
 import org.apache.usergrid.corepersistence.util.CpNamingUtils;
-import org.apache.usergrid.persistence.index.EntityIndexBatch;
-import org.apache.usergrid.utils.UUIDUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.util.Assert;
-
 import org.apache.usergrid.persistence.ConnectedEntityRef;
 import org.apache.usergrid.persistence.ConnectionRef;
 import org.apache.usergrid.persistence.Entity;
-import org.apache.usergrid.persistence.EntityFactory;
 import org.apache.usergrid.persistence.EntityManager;
 import org.apache.usergrid.persistence.EntityRef;
 import org.apache.usergrid.persistence.IndexBucketLocator;
@@ -56,43 +49,12 @@ import org.apache.usergrid.persistence.RelationManager;
 import org.apache.usergrid.persistence.Results;
 import org.apache.usergrid.persistence.RoleRef;
 import org.apache.usergrid.persistence.Schema;
-import static org.apache.usergrid.persistence.Schema.COLLECTION_ROLES;
-import static org.apache.usergrid.persistence.Schema.DICTIONARY_CONNECTED_ENTITIES;
-import static org.apache.usergrid.persistence.Schema.DICTIONARY_CONNECTED_TYPES;
-import static org.apache.usergrid.persistence.Schema.DICTIONARY_CONNECTING_ENTITIES;
-import static org.apache.usergrid.persistence.Schema.DICTIONARY_CONNECTING_TYPES;
-import static org.apache.usergrid.persistence.Schema.INDEX_CONNECTIONS;
-import static org.apache.usergrid.persistence.Schema.PROPERTY_CREATED;
-import static org.apache.usergrid.persistence.Schema.PROPERTY_INACTIVITY;
-import static org.apache.usergrid.persistence.Schema.PROPERTY_NAME;
-import static org.apache.usergrid.persistence.Schema.PROPERTY_TITLE;
-import static org.apache.usergrid.persistence.Schema.TYPE_APPLICATION;
-import static org.apache.usergrid.persistence.Schema.TYPE_ENTITY;
-import static org.apache.usergrid.persistence.Schema.TYPE_ROLE;
-import static org.apache.usergrid.persistence.Schema.getDefaultSchema;
 import org.apache.usergrid.persistence.SimpleEntityRef;
 import org.apache.usergrid.persistence.SimpleRoleRef;
-import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_COMPOSITE_DICTIONARIES;
-import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_DICTIONARIES;
-import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_INDEX;
-import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_INDEX_ENTRIES;
-import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.addDeleteToMutator;
-import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.addInsertToMutator;
-import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.batchExecute;
-import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.key;
 import org.apache.usergrid.persistence.cassandra.CassandraService;
-import static org.apache.usergrid.persistence.cassandra.CassandraService.INDEX_ENTRY_LIST_COUNT;
 import org.apache.usergrid.persistence.cassandra.ConnectionRefImpl;
-import static org.apache.usergrid.persistence.cassandra.GeoIndexManager.batchDeleteLocationInConnectionsIndex;
-import static org.apache.usergrid.persistence.cassandra.GeoIndexManager.batchRemoveLocationFromCollectionIndex;
-import static org.apache.usergrid.persistence.cassandra.GeoIndexManager.batchStoreLocationInCollectionIndex;
-import static org.apache.usergrid.persistence.cassandra.GeoIndexManager.batchStoreLocationInConnectionsIndex;
 import org.apache.usergrid.persistence.cassandra.IndexUpdate;
-import static org.apache.usergrid.persistence.cassandra.IndexUpdate.indexValueCode;
-import static org.apache.usergrid.persistence.cassandra.IndexUpdate.toIndexableValue;
-import static org.apache.usergrid.persistence.cassandra.IndexUpdate.validIndexableValue;
 import org.apache.usergrid.persistence.cassandra.QueryProcessorImpl;
-import static org.apache.usergrid.persistence.cassandra.Serializers.be;
 import org.apache.usergrid.persistence.cassandra.index.ConnectedIndexScanner;
 import org.apache.usergrid.persistence.cassandra.index.IndexBucketScanner;
 import org.apache.usergrid.persistence.cassandra.index.IndexScanner;
@@ -114,6 +76,7 @@ import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType;
 import org.apache.usergrid.persistence.index.EntityIndex;
+import org.apache.usergrid.persistence.index.EntityIndexBatch;
 import org.apache.usergrid.persistence.index.IndexScope;
 import org.apache.usergrid.persistence.index.impl.IndexScopeImpl;
 import org.apache.usergrid.persistence.index.query.CandidateResult;
@@ -138,19 +101,59 @@ import org.apache.usergrid.persistence.query.ir.result.GeoIterator;
 import org.apache.usergrid.persistence.query.ir.result.SliceIterator;
 import org.apache.usergrid.persistence.query.ir.result.StaticIdIterator;
 import org.apache.usergrid.persistence.schema.CollectionInfo;
-import static org.apache.usergrid.utils.ClassUtils.cast;
-import static org.apache.usergrid.utils.CompositeUtils.setGreaterThanEqualityFlag;
 import org.apache.usergrid.utils.IndexUtils;
-import static org.apache.usergrid.utils.InflectionUtils.singularize;
 import org.apache.usergrid.utils.MapUtils;
-import static org.apache.usergrid.utils.MapUtils.addMapSet;
-import static org.apache.usergrid.utils.UUIDUtils.getTimestampInMicros;
+import org.apache.usergrid.utils.UUIDUtils;
+
+import com.yammer.metrics.annotation.Metered;
 
 import me.prettyprint.hector.api.Keyspace;
 import me.prettyprint.hector.api.beans.DynamicComposite;
 import me.prettyprint.hector.api.beans.HColumn;
 import me.prettyprint.hector.api.mutation.Mutator;
 import rx.Observable;
+import rx.functions.Action1;
+import rx.functions.Func1;
+
+import static java.util.Arrays.asList;
+
+import static me.prettyprint.hector.api.factory.HFactory.createMutator;
+import static org.apache.usergrid.persistence.Schema.COLLECTION_ROLES;
+import static org.apache.usergrid.persistence.Schema.DICTIONARY_CONNECTED_ENTITIES;
+import static org.apache.usergrid.persistence.Schema.DICTIONARY_CONNECTED_TYPES;
+import static org.apache.usergrid.persistence.Schema.DICTIONARY_CONNECTING_ENTITIES;
+import static org.apache.usergrid.persistence.Schema.DICTIONARY_CONNECTING_TYPES;
+import static org.apache.usergrid.persistence.Schema.INDEX_CONNECTIONS;
+import static org.apache.usergrid.persistence.Schema.PROPERTY_CREATED;
+import static org.apache.usergrid.persistence.Schema.PROPERTY_INACTIVITY;
+import static org.apache.usergrid.persistence.Schema.PROPERTY_NAME;
+import static org.apache.usergrid.persistence.Schema.PROPERTY_TITLE;
+import static org.apache.usergrid.persistence.Schema.TYPE_APPLICATION;
+import static org.apache.usergrid.persistence.Schema.TYPE_ENTITY;
+import static org.apache.usergrid.persistence.Schema.TYPE_ROLE;
+import static org.apache.usergrid.persistence.Schema.getDefaultSchema;
+import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_COMPOSITE_DICTIONARIES;
+import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_DICTIONARIES;
+import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_INDEX;
+import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_INDEX_ENTRIES;
+import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.addDeleteToMutator;
+import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.addInsertToMutator;
+import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.batchExecute;
+import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.key;
+import static org.apache.usergrid.persistence.cassandra.CassandraService.INDEX_ENTRY_LIST_COUNT;
+import static org.apache.usergrid.persistence.cassandra.GeoIndexManager.batchDeleteLocationInConnectionsIndex;
+import static org.apache.usergrid.persistence.cassandra.GeoIndexManager.batchRemoveLocationFromCollectionIndex;
+import static org.apache.usergrid.persistence.cassandra.GeoIndexManager.batchStoreLocationInCollectionIndex;
+import static org.apache.usergrid.persistence.cassandra.GeoIndexManager.batchStoreLocationInConnectionsIndex;
+import static org.apache.usergrid.persistence.cassandra.IndexUpdate.indexValueCode;
+import static org.apache.usergrid.persistence.cassandra.IndexUpdate.toIndexableValue;
+import static org.apache.usergrid.persistence.cassandra.IndexUpdate.validIndexableValue;
+import static org.apache.usergrid.persistence.cassandra.Serializers.be;
+import static org.apache.usergrid.utils.ClassUtils.cast;
+import static org.apache.usergrid.utils.CompositeUtils.setGreaterThanEqualityFlag;
+import static org.apache.usergrid.utils.InflectionUtils.singularize;
+import static org.apache.usergrid.utils.MapUtils.addMapSet;
+import static org.apache.usergrid.utils.UUIDUtils.getTimestampInMicros;
 
 
 /**
@@ -161,9 +164,8 @@ public class CpRelationManager implements RelationManager {
     private static final Logger logger = LoggerFactory.getLogger( CpRelationManager.class );
 
 
-
     private CpEntityManagerFactory emf;
-    
+
     private CpManagerCache managerCache;
 
     private EntityManager em;
@@ -184,23 +186,18 @@ public class CpRelationManager implements RelationManager {
     private ResultsLoaderFactory resultsLoaderFactory;
 
 
-
     public CpRelationManager() {}
 
 
-    public CpRelationManager init( 
-        EntityManager em, 
-        CpEntityManagerFactory emf, 
-        UUID applicationId,
-        EntityRef headEntity, 
-        IndexBucketLocator indexBucketLocator ) {
+    public CpRelationManager init( EntityManager em, CpEntityManagerFactory emf, UUID applicationId,
+                                   EntityRef headEntity, IndexBucketLocator indexBucketLocator ) {
 
         Assert.notNull( em, "Entity manager cannot be null" );
         Assert.notNull( emf, "Entity manager factory cannot be null" );
         Assert.notNull( applicationId, "Application Id cannot be null" );
         Assert.notNull( headEntity, "Head entity cannot be null" );
         Assert.notNull( headEntity.getUuid(), "Head entity uuid cannot be null" );
-       
+
         // TODO: this assert should not be failing
         //Assert.notNull( indexBucketLocator, "indexBucketLocator cannot be null" );
 
@@ -209,31 +206,27 @@ public class CpRelationManager implements RelationManager {
         this.applicationId = applicationId;
         this.headEntity = headEntity;
         this.managerCache = emf.getManagerCache();
-        this.applicationScope = emf.getApplicationScope(applicationId);
+        this.applicationScope = emf.getApplicationScope( applicationId );
 
         this.cass = em.getCass(); // TODO: eliminate need for this via Core Persistence
         this.indexBucketLocator = indexBucketLocator; // TODO: this also
 
         // load the Core Persistence version of the head entity as well
-        this.headEntityScope = new CollectionScopeImpl( 
-            this.applicationScope.getApplication(), 
-            this.applicationScope.getApplication(), 
-                CpNamingUtils.getCollectionScopeNameFromEntityType( headEntity.getType() ));
+        this.headEntityScope =
+                new CollectionScopeImpl( this.applicationScope.getApplication(), this.applicationScope.getApplication(),
+                        CpNamingUtils.getCollectionScopeNameFromEntityType( headEntity.getType() ) );
 
-        EntityCollectionManager ecm = managerCache.getEntityCollectionManager(headEntityScope);
+        EntityCollectionManager ecm = managerCache.getEntityCollectionManager( headEntityScope );
         if ( logger.isDebugEnabled() ) {
-            logger.debug( "Loading head entity {}:{} from scope\n   app {}\n   owner {}\n   name {}", 
-                new Object[] {
-                    headEntity.getType(), 
-                    headEntity.getUuid(), 
-                    headEntityScope.getApplication(), 
-                    headEntityScope.getOwner(),
-                    headEntityScope.getName()
-            } );
+            logger.debug( "Loading head entity {}:{} from scope\n   app {}\n   owner {}\n   name {}", new Object[] {
+                            headEntity.getType(), headEntity.getUuid(), headEntityScope.getApplication(),
+                            headEntityScope.getOwner(), headEntityScope.getName()
+                    } );
         }
-        
-        this.cpHeadEntity = ecm.load( new SimpleId( 
-            headEntity.getUuid(), headEntity.getType() )).toBlocking().lastOrDefault(null);
+
+        //TODO PERFORMANCE why are we loading this again here?
+        this.cpHeadEntity = ecm.load( new SimpleId( headEntity.getUuid(), headEntity.getType() ) ).toBlocking()
+                               .lastOrDefault( null );
 
         // commented out because it is possible that CP entity has not been created yet
         Assert.notNull( cpHeadEntity, "cpHeadEntity cannot be null" );
@@ -243,22 +236,21 @@ public class CpRelationManager implements RelationManager {
         return this;
     }
 
-    
+
     @Override
     public Set<String> getCollectionIndexes( String collectionName ) throws Exception {
         final Set<String> indexes = new HashSet<String>();
 
-        GraphManager gm = managerCache.getGraphManager(applicationScope);
+        GraphManager gm = managerCache.getGraphManager( applicationScope );
 
         String edgeTypePrefix = CpNamingUtils.getEdgeTypeFromCollectionName( collectionName );
 
-        logger.debug("getCollectionIndexes(): Searching for edge type prefix {} to target {}:{}", 
-            new Object[] {
-                edgeTypePrefix, cpHeadEntity.getId().getType(), cpHeadEntity.getId().getUuid()
-        });
+        logger.debug( "getCollectionIndexes(): Searching for edge type prefix {} to target {}:{}", new Object[] {
+                        edgeTypePrefix, cpHeadEntity.getId().getType(), cpHeadEntity.getId().getUuid()
+                } );
 
-        Observable<String> types= gm.getEdgeTypesFromSource( 
-            new SimpleSearchEdgeType( cpHeadEntity.getId(), edgeTypePrefix,  null ));
+        Observable<String> types =
+                gm.getEdgeTypesFromSource( new SimpleSearchEdgeType( cpHeadEntity.getId(), edgeTypePrefix, null ) );
 
         Iterator<String> iter = types.toBlockingObservable().getIterator();
         while ( iter.hasNext() ) {
@@ -275,8 +267,7 @@ public class CpRelationManager implements RelationManager {
         //Map<EntityRef, Set<String>> containerEntities = getContainers(-1, "owns", null);
         Map<EntityRef, Set<String>> containerEntities = getContainers();
 
-        Map<String, Map<UUID, Set<String>>> owners = 
-                new LinkedHashMap<String, Map<UUID, Set<String>>>();
+        Map<String, Map<UUID, Set<String>>> owners = new LinkedHashMap<String, Map<UUID, Set<String>>>();
 
         for ( EntityRef owner : containerEntities.keySet() ) {
             Set<String> collections = containerEntities.get( owner );
@@ -296,6 +287,7 @@ public class CpRelationManager implements RelationManager {
 
     /**
      * Gets containing collections and/or connections depending on the edge type you pass in
+     *
      * @param limit Max number to return
      * @param edgeType Edge type, edge type prefix or null to allow any edge type
      * @param fromEntityType Only consider edges from entities of this type
@@ -304,44 +296,43 @@ public class CpRelationManager implements RelationManager {
 
         Map<EntityRef, Set<String>> results = new LinkedHashMap<EntityRef, Set<String>>();
 
-        GraphManager gm = managerCache.getGraphManager(applicationScope);
+        GraphManager gm = managerCache.getGraphManager( applicationScope );
 
-        Iterator<String> edgeTypes = gm.getEdgeTypesToTarget( new SimpleSearchEdgeType( 
-            cpHeadEntity.getId(), edgeType, null) ).toBlocking().getIterator();
+        Iterator<String> edgeTypes =
+                gm.getEdgeTypesToTarget( new SimpleSearchEdgeType( cpHeadEntity.getId(), edgeType, null ) ).toBlocking()
+                  .getIterator();
 
-        logger.debug("getContainers(): "
-                + "Searched for edges of type {}\n   to target {}:{}\n   in scope {}\n   found: {}", 
-            new Object[] {
-                edgeType,
-                cpHeadEntity.getId().getType(), 
-                cpHeadEntity.getId().getUuid(), 
-                applicationScope.getApplication(),
-                edgeTypes.hasNext()
-        });
+        logger.debug(
+                "getContainers(): " + "Searched for edges of type {}\n   to target {}:{}\n   in scope {}\n   found: {}",
+                new Object[] {
+                        edgeType, cpHeadEntity.getId().getType(), cpHeadEntity.getId().getUuid(),
+                        applicationScope.getApplication(), edgeTypes.hasNext()
+                } );
 
         while ( edgeTypes.hasNext() ) {
 
             String etype = edgeTypes.next();
 
-            Observable<Edge> edges = gm.loadEdgesToTarget( new SimpleSearchByEdgeType(
-                cpHeadEntity.getId(), etype, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, null ));
+            Observable<Edge> edges = gm.loadEdgesToTarget(
+                    new SimpleSearchByEdgeType( cpHeadEntity.getId(), etype, Long.MAX_VALUE,
+                            SearchByEdgeType.Order.DESCENDING, null ) );
 
             Iterator<Edge> iter = edges.toBlockingObservable().getIterator();
             while ( iter.hasNext() ) {
                 Edge edge = iter.next();
 
-                if ( fromEntityType != null && !fromEntityType.equals( edge.getSourceNode().getType() )) {
-                    logger.debug("Ignoring edge from entity type {}", edge.getSourceNode().getType());
+                if ( fromEntityType != null && !fromEntityType.equals( edge.getSourceNode().getType() ) ) {
+                    logger.debug( "Ignoring edge from entity type {}", edge.getSourceNode().getType() );
                     continue;
                 }
 
-                EntityRef eref = new SimpleEntityRef( 
-                    edge.getSourceNode().getType(), edge.getSourceNode().getUuid() );
+                EntityRef eref = new SimpleEntityRef( edge.getSourceNode().getType(), edge.getSourceNode().getUuid() );
 
                 String name = null;
-                if ( CpNamingUtils.isConnectionEdgeType( edge.getType() )) {
+                if ( CpNamingUtils.isConnectionEdgeType( edge.getType() ) ) {
                     name = CpNamingUtils.getConnectionType( edge.getType() );
-                } else {
+                }
+                else {
                     name = CpNamingUtils.getCollectionName( edge.getType() );
                 }
                 addMapSet( results, eref, name );
@@ -356,183 +347,161 @@ public class CpRelationManager implements RelationManager {
     }
 
 
-    public List<String> updateContainingCollectionAndCollectionIndexes( 
-        Entity entity, org.apache.usergrid.persistence.model.entity.Entity cpEntity ) {
-
-        List<String> results = new ArrayList<String>();
+    public void updateContainingCollectionAndCollectionIndexes(
+            final org.apache.usergrid.persistence.model.entity.Entity cpEntity ) {
 
-        GraphManager gm = managerCache.getGraphManager(applicationScope);
 
-        Iterator<String> edgeTypesToTarget = gm.getEdgeTypesToTarget( new SimpleSearchEdgeType( 
-            cpHeadEntity.getId(), null, null) ).toBlockingObservable().getIterator();
+        final GraphManager gm = managerCache.getGraphManager( applicationScope );
 
-        logger.debug("updateContainingCollectionsAndCollections(): "
-                + "Searched for edges to target {}:{}\n   in scope {}\n   found: {}", 
-            new Object[] {
-                cpHeadEntity.getId().getType(), 
-                cpHeadEntity.getId().getUuid(), 
-                applicationScope.getApplication(),
-                edgeTypesToTarget.hasNext()
-        });
+        logger.debug( "updateContainingCollectionsAndCollections(): "
+                        + "Searched for edges to target {}:{}\n   in scope {}\n   found: {}", new Object[] {
+                        cpHeadEntity.getId().getType(), cpHeadEntity.getId().getUuid(),
+                        applicationScope.getApplication()
+                } );
 
         // loop through all types of edge to target
-        int count = 0;
+
 
         final EntityIndex ei = managerCache.getEntityIndex( applicationScope );
 
         final EntityIndexBatch entityIndexBatch = ei.createBatch();
 
-        while ( edgeTypesToTarget.hasNext() ) {
-
-            // get all edges of the type
-            String etype = edgeTypesToTarget.next();
+        final int count = gm.getEdgeTypesToTarget(
+                new SimpleSearchEdgeType( cpHeadEntity.getId(), null, null ) )
+                //for each edge type, emit all the edges of that type
+                            .flatMap( new Func1<String, Observable<Edge>>() {
+                                @Override
+                                public Observable<Edge> call( final String etype ) {
+                                    return gm.loadEdgesToTarget(
+                                            new SimpleSearchByEdgeType( cpHeadEntity.getId(), etype, Long.MAX_VALUE,
+                                                    SearchByEdgeType.Order.DESCENDING, null ) );
+                                }
+                            } )
 
-            Observable<Edge> edges = gm.loadEdgesToTarget( new SimpleSearchByEdgeType(
-                cpHeadEntity.getId(), etype, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,  null ));
+                            //for each edge we receive index and add to the batch
+                            .doOnNext( new Action1<Edge>() {
+                    @Override
+                    public void call( final Edge edge ) {
 
-            // loop through edges of that type
-            Iterator<Edge> iter = edges.toBlockingObservable().getIterator();
-            while ( iter.hasNext() ) {
 
-                Edge edge = iter.next();
+                        EntityRef sourceEntity =
+                                new SimpleEntityRef( edge.getSourceNode().getType(), edge.getSourceNode().getUuid() );
 
-                EntityRef sourceEntity = new SimpleEntityRef( 
-                    edge.getSourceNode().getType(), edge.getSourceNode().getUuid() );
+                        // reindex the entity in the source entity's collection or connection index
 
-                // reindex the entity in the source entity's collection or connection index
+                        IndexScope indexScope;
+                        if ( CpNamingUtils.isCollectionEdgeType( edge.getType() ) ) {
 
-                IndexScope indexScope;
-                if ( CpNamingUtils.isCollectionEdgeType( edge.getType() )) {
+                            String collName = CpNamingUtils.getCollectionName( edge.getType() );
+                            indexScope =
+                                    new IndexScopeImpl( new SimpleId( sourceEntity.getUuid(), sourceEntity.getType() ),
+                                            CpNamingUtils.getCollectionScopeNameFromCollectionName( collName ) );
+                        }
+                        else {
 
-                    String collName = CpNamingUtils.getCollectionName( edge.getType() );
-                    indexScope = new IndexScopeImpl(
-                        new SimpleId(sourceEntity.getUuid(), sourceEntity.getType()),
-                        CpNamingUtils.getCollectionScopeNameFromCollectionName( collName ));
+                            String connName = CpNamingUtils.getCollectionName( edge.getType() );
+                            indexScope =
+                                    new IndexScopeImpl( new SimpleId( sourceEntity.getUuid(), sourceEntity.getType() ),
+                                            CpNamingUtils.getConnectionScopeName( cpEntity.getId().getType(),
+                                                    connName ) );
+                        }
 
-                } else {
+                        entityIndexBatch.index( indexScope, cpEntity );
 
-                    String connName = CpNamingUtils.getCollectionName( edge.getType() );
-                    indexScope = new IndexScopeImpl(
-                        new SimpleId(sourceEntity.getUuid(), sourceEntity.getType()),
-                        CpNamingUtils.getConnectionScopeName( cpHeadEntity.getId().getType(), connName ));
-                }
+                        // reindex the entity in the source entity's all-types index
 
-                entityIndexBatch.index(indexScope, cpEntity);
+                        indexScope = new IndexScopeImpl( new SimpleId( sourceEntity.getUuid(), sourceEntity.getType() ),
+                                CpNamingUtils.ALL_TYPES );
 
-                // reindex the entity in the source entity's all-types index
-                
-                indexScope = new IndexScopeImpl(
-                    new SimpleId(sourceEntity.getUuid(), sourceEntity.getType()),
-                        CpNamingUtils.ALL_TYPES);
-
-                entityIndexBatch.index(indexScope, cpEntity);
+                        entityIndexBatch.index( indexScope, cpEntity );
+                    }
+                } ).count().toBlocking().lastOrDefault( 0 );
 
-                count++;
-            }
-        }
 
         entityIndexBatch.execute();
 
-        logger.debug("updateContainingCollectionsAndCollections() updated {} indexes", count);
-        return results;
+        logger.debug( "updateContainingCollectionsAndCollections() updated {} indexes", count );
     }
 
 
     @Override
-    public boolean isConnectionMember(String connectionType, EntityRef entity) throws Exception {
+    public boolean isConnectionMember( String connectionType, EntityRef entity ) throws Exception {
 
         Id entityId = new SimpleId( entity.getUuid(), entity.getType() );
 
         String edgeType = CpNamingUtils.getEdgeTypeFromConnectionType( connectionType );
 
-        logger.debug("isConnectionMember(): Checking for edge type {} from {}:{} to {}:{}", 
-            new Object[] { 
-                edgeType, 
-                headEntity.getType(), headEntity.getUuid(), 
-                entity.getType(), entity.getUuid() });
-
-        GraphManager gm = managerCache.getGraphManager(applicationScope);
-        Observable<Edge> edges = gm.loadEdgeVersions( 
-            new SimpleSearchByEdge(
-                new SimpleId(headEntity.getUuid(), headEntity.getType()), 
-                edgeType,  
-                entityId, 
-                Long.MAX_VALUE,  SearchByEdgeType.Order.DESCENDING,
-                null));
-
-        return edges.toBlockingObservable().firstOrDefault(null) != null;
+        logger.debug( "isConnectionMember(): Checking for edge type {} from {}:{} to {}:{}", new Object[] {
+                        edgeType, headEntity.getType(), headEntity.getUuid(), entity.getType(), entity.getUuid()
+                } );
+
+        GraphManager gm = managerCache.getGraphManager( applicationScope );
+        Observable<Edge> edges = gm.loadEdgeVersions(
+                new SimpleSearchByEdge( new SimpleId( headEntity.getUuid(), headEntity.getType() ), edgeType, entityId,
+                        Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, null ) );
+
+        return edges.toBlockingObservable().firstOrDefault( null ) != null;
     }
 
 
-    @SuppressWarnings("unchecked")
-    @Metered(group = "core", name = "RelationManager_isOwner")
+    @SuppressWarnings( "unchecked" )
+    @Metered( group = "core", name = "RelationManager_isOwner" )
     @Override
-    public boolean isCollectionMember(String collName, EntityRef entity) throws Exception {
+    public boolean isCollectionMember( String collName, EntityRef entity ) throws Exception {
 
         Id entityId = new SimpleId( entity.getUuid(), entity.getType() );
 
         String edgeType = CpNamingUtils.getEdgeTypeFromCollectionName( collName );
 
-        logger.debug("isCollectionMember(): Checking for edge type {} from {}:{} to {}:{}", 
-            new Object[] { 
-                edgeType, 
-                headEntity.getType(), headEntity.getUuid(), 
-                entity.getType(), entity.getUuid() });
-
-        GraphManager gm = managerCache.getGraphManager(applicationScope);
-        Observable<Edge> edges = gm.loadEdgeVersions( 
-            new SimpleSearchByEdge(
-                new SimpleId(headEntity.getUuid(), headEntity.getType()), 
-                edgeType,  
-                entityId, 
-                Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
-                null));
-
-        return edges.toBlockingObservable().firstOrDefault(null) != null;
+        logger.debug( "isCollectionMember(): Checking for edge type {} from {}:{} to {}:{}", new Object[] {
+                        edgeType, headEntity.getType(), headEntity.getUuid(), entity.getType(), entity.getUuid()
+                } );
+
+        GraphManager gm = managerCache.getGraphManager( applicationScope );
+        Observable<Edge> edges = gm.loadEdgeVersions(
+                new SimpleSearchByEdge( new SimpleId( headEntity.getUuid(), headEntity.getType() ), edgeType, entityId,
+                        Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, null ) );
+
+        return edges.toBlockingObservable().firstOrDefault( null ) != null;
     }
 
 
-   private boolean moreThanOneInboundConnection( 
-           EntityRef target, String connectionType ) {
+    private boolean moreThanOneInboundConnection( EntityRef target, String connectionType ) {
 
         Id targetId = new SimpleId( target.getUuid(), target.getType() );
 
-        GraphManager gm = managerCache.getGraphManager(applicationScope);
+        GraphManager gm = managerCache.getGraphManager( applicationScope );
 
-        Observable<Edge> edgesToTarget = gm.loadEdgesToTarget( new SimpleSearchByEdgeType(
-            targetId,
-            CpNamingUtils.getEdgeTypeFromConnectionType( connectionType ),
-            System.currentTimeMillis(), SearchByEdgeType.Order.DESCENDING,
-            null)); // last
+        Observable<Edge> edgesToTarget = gm.loadEdgesToTarget(
+                new SimpleSearchByEdgeType( targetId, CpNamingUtils.getEdgeTypeFromConnectionType( connectionType ),
+                        System.currentTimeMillis(), SearchByEdgeType.Order.DESCENDING, null ) ); // last
 
         Iterator<Edge> iterator = edgesToTarget.toBlockingObservable().getIterator();
         int count = 0;
         while ( iterator.hasNext() ) {
             iterator.next();
-            if ( count++ > 1 ) { 
+            if ( count++ > 1 ) {
                 return true;
             }
-        } 
+        }
         return false;
-   } 
+    }
 
-   private boolean moreThanOneOutboundConnection( 
-           EntityRef source, String connectionType ) {
+
+    private boolean moreThanOneOutboundConnection( EntityRef source, String connectionType ) {
 
         Id sourceId = new SimpleId( source.getUuid(), source.getType() );
 
-        GraphManager gm = managerCache.getGraphManager(applicationScope);
+        GraphManager gm = managerCache.getGraphManager( applicationScope );
 
-        Observable<Edge> edgesFromSource = gm.loadEdgesFromSource(new SimpleSearchByEdgeType(
-            sourceId,
-            CpNamingUtils.getEdgeTypeFromConnectionType( connectionType ),
-            System.currentTimeMillis(),SearchByEdgeType.Order.DESCENDING,
-            null)); // last
+        Observable<Edge> edgesFromSource = gm.loadEdgesFromSource(
+                new SimpleSearchByEdgeType( sourceId, CpNamingUtils.getEdgeTypeFromConnectionType( connectionType ),
+                        System.currentTimeMillis(), SearchByEdgeType.Order.DESCENDING, null ) ); // last
 
         int count = edgesFromSource.take( 2 ).count().toBlocking().last();
 
         return count > 1;
-   } 
+    }
 
 
     @Override
@@ -540,10 +509,10 @@ public class CpRelationManager implements RelationManager {
 
         final Set<String> indexes = new HashSet<String>();
 
-        GraphManager gm = managerCache.getGraphManager(applicationScope);
+        GraphManager gm = managerCache.getGraphManager( applicationScope );
 
-        Observable<String> str = gm.getEdgeTypesFromSource( 
-                new SimpleSearchEdgeType( cpHeadEntity.getId(), null , null ));
+        Observable<String> str =
+                gm.getEdgeTypesFromSource( new SimpleSearchEdgeType( cpHeadEntity.getId(), null, null ) );
 
         Iterator<String> iter = str.toBlockingObservable().getIterator();
         while ( iter.hasNext() ) {
@@ -552,52 +521,53 @@ public class CpRelationManager implements RelationManager {
         }
 
         return indexes;
-
     }
 
+
     @Override
-    public Results getCollection(String collectionName, UUID startResult, int count, 
-            Level resultsLevel, boolean reversed) throws Exception {
+    public Results getCollection( String collectionName, UUID startResult, int count, Level resultsLevel,
+                                  boolean reversed ) throws Exception {
 
-        Query query = Query.fromQL("select *");
-        query.setLimit(count);
-        query.setReversed(reversed);
+        Query query = Query.fromQL( "select *" );
+        query.setLimit( count );
+        query.setReversed( reversed );
 
         if ( startResult != null ) {
-            query.addGreaterThanEqualFilter("created", startResult.timestamp());
+            query.addGreaterThanEqualFilter( "created", startResult.timestamp() );
         }
 
-        return searchCollection(collectionName, query);
+        return searchCollection( collectionName, query );
     }
 
+
     @Override
-    public Results getCollection(
-            String collName, Query query, Level level) throws Exception {
+    public Results getCollection( String collName, Query query, Level level ) throws Exception {
 
-        return searchCollection(collName, query);
+        return searchCollection( collName, query );
     }
 
 
     // add to a named collection of the head entity
     @Override
-    public Entity addToCollection(String collName, EntityRef itemRef) throws Exception {
-       
+    public Entity addToCollection( String collName, EntityRef itemRef ) throws Exception {
+
         CollectionInfo collection = getDefaultSchema().getCollection( headEntity.getType(), collName );
         if ( ( collection != null ) && !collection.getType().equals( itemRef.getType() ) ) {
             return null;
         }
 
-        return addToCollection( collName, itemRef, 
-                (collection != null && collection.getLinkedCollection() != null) );
+        return addToCollection( collName, itemRef, ( collection != null && collection.getLinkedCollection() != null ) );
     }
 
-    public Entity addToCollection(String collName, EntityRef itemRef, boolean connectBack ) throws Exception {
+
+    public Entity addToCollection( String collName, EntityRef itemRef, boolean connectBack ) throws Exception {
 
         // don't fetch entity if we've already got one
         final Entity itemEntity;
         if ( itemRef instanceof Entity ) {
-            itemEntity = (Entity)itemRef;
-        } else {
+            itemEntity = ( Entity ) itemRef;
+        }
+        else {
             itemEntity = em.get( itemRef );
         }
 
@@ -611,70 +581,61 @@ public class CpRelationManager implements RelationManager {
         }
 
         // load the new member entity to be added to the collection from its default scope
-        CollectionScope memberScope = new CollectionScopeImpl( 
-            applicationScope.getApplication(), 
-            applicationScope.getApplication(),
-            CpNamingUtils.getCollectionScopeNameFromEntityType( itemRef.getType() ));
+        CollectionScope memberScope =
+                new CollectionScopeImpl( applicationScope.getApplication(), applicationScope.getApplication(),
+                        CpNamingUtils.getCollectionScopeNameFromEntityType( itemRef.getType() ) );
 
-        EntityCollectionManager memberMgr = managerCache.getEntityCollectionManager(memberScope);
+        EntityCollectionManager memberMgr = managerCache.getEntityCollectionManager( memberScope );
 
         //TODO, this double load should disappear once events are in
-        org.apache.usergrid.persistence.model.entity.Entity memberEntity = memberMgr.load(
-            new SimpleId( itemRef.getUuid(), itemRef.getType() )).toBlocking().last();
+        org.apache.usergrid.persistence.model.entity.Entity memberEntity =
+                memberMgr.load( new SimpleId( itemRef.getUuid(), itemRef.getType() ) ).toBlocking().last();
 
         if ( memberEntity == null ) {
-            throw new RuntimeException("Unable to load entity uuid=" 
-                + itemRef.getUuid() + " type=" + itemRef.getType());
+            throw new RuntimeException(
+                    "Unable to load entity uuid=" + itemRef.getUuid() + " type=" + itemRef.getType() );
         }
 
         if ( logger.isDebugEnabled() ) {
-            logger.debug("Loaded member entity {}:{} from scope\n   app {}\n   "
-                    + "owner {}\n   name {} data {}", 
-                new Object[] { 
-                    itemRef.getType(), 
-                    itemRef.getUuid(), 
-                    memberScope.getApplication(), 
-                    memberScope.getOwner(), 
-                    memberScope.getName(),
-                    CpEntityMapUtils.toMap( memberEntity )
-            });
+            logger.debug( "Loaded member entity {}:{} from scope\n   app {}\n   " + "owner {}\n   name {} data {}",
+                    new Object[] {
+                            itemRef.getType(), itemRef.getUuid(), memberScope.getApplication(), memberScope.getOwner(),
+                            memberScope.getName(), CpEntityMapUtils.toMap( memberEntity )
+                    } );
         }
 
         String edgeType = CpNamingUtils.getEdgeTypeFromCollectionName( collName );
 
-        UUID timeStampUuid =   memberEntity.getId().getUuid() != null 
-                &&  UUIDUtils.isTimeBased( memberEntity.getId().getUuid()) 
-                ?  memberEntity.getId().getUuid() : UUIDUtils.newTimeUUID();
+        UUID timeStampUuid =
+                memberEntity.getId().getUuid() != null && UUIDUtils.isTimeBased( memberEntity.getId().getUuid() ) ?
+                memberEntity.getId().getUuid() : UUIDUtils.newTimeUUID();
 
-        long uuidHash =    UUIDUtils.getUUIDLong(timeStampUuid);
+        long uuidHash = UUIDUtils.getUUIDLong( timeStampUuid );
 
         // create graph edge connection from head entity to member entity
-        Edge edge = new SimpleEdge(
-            cpHeadEntity.getId(),
-            edgeType,
-            memberEntity.getId(),
-           uuidHash);
-        GraphManager gm = managerCache.getGraphManager(applicationScope);
-        gm.writeEdge(edge).toBlockingObservable().last();
+        Edge edge = new SimpleEdge( cpHeadEntity.getId(), edgeType, memberEntity.getId(), uuidHash );
+        GraphManager gm = managerCache.getGraphManager( applicationScope );
+        gm.writeEdge( edge ).toBlockingObservable().last();
 
-        logger.debug("Wrote edgeType {}\n   from {}:{}\n   to {}:{}\n   scope {}:{}", new Object[] { 
-            edgeType, cpHeadEntity.getId().getType(), cpHeadEntity.getId().getUuid(),
-            memberEntity.getId().getType(), memberEntity.getId().getUuid(),
-            applicationScope.getApplication().getType(), applicationScope.getApplication().getUuid()});  
+        logger.debug( "Wrote edgeType {}\n   from {}:{}\n   to {}:{}\n   scope {}:{}", new Object[] {
+                edgeType, cpHeadEntity.getId().getType(), cpHeadEntity.getId().getUuid(),
+                memberEntity.getId().getType(), memberEntity.getId().getUuid(),
+                applicationScope.getApplication().getType(), applicationScope.getApplication().getUuid()
+        } );
 
-        ((CpEntityManager)em).indexEntityIntoCollection( cpHeadEntity, memberEntity, collName );
+        ( ( CpEntityManager ) em ).indexEntityIntoCollection( cpHeadEntity, memberEntity, collName );
 
-        logger.debug("Added entity {}:{} to collection {}", new Object[] { 
-            itemRef.getUuid().toString(), itemRef.getType(), collName }); 
+        logger.debug( "Added entity {}:{} to collection {}", new Object[] {
+                itemRef.getUuid().toString(), itemRef.getType(), collName
+        } );
 
-//        logger.debug("With head entity scope is {}:{}:{}", new Object[] { 
-//            headEntityScope.getApplication().toString(), 
-//            headEntityScope.getOwner().toString(),
-//            headEntityScope.getName()}); 
+        //        logger.debug("With head entity scope is {}:{}:{}", new Object[] {
+        //            headEntityScope.getApplication().toString(),
+        //            headEntityScope.getOwner().toString(),
+        //            headEntityScope.getName()});
 
         if ( connectBack && collection != null && collection.getLinkedCollection() != null ) {
-            getRelationManager( itemEntity )
-                .addToCollection( collection.getLinkedCollection(), headEntity, false );
+            getRelationManager( itemEntity ).addToCollection( collection.getLinkedCollection(), headEntity, false );
         }
 
         return itemEntity;
@@ -682,11 +643,11 @@ public class CpRelationManager implements RelationManager {
 
 
     @Override
-    public Entity addToCollections(List<EntityRef> owners, String collName) throws Exception {
+    public Entity addToCollections( List<EntityRef> owners, String collName ) throws Exception {
 
         // TODO: this addToCollections() implementation seems wrong.
         for ( EntityRef eref : owners ) {
-            addToCollection( collName, eref ); 
+            addToCollection( collName, eref );
         }
 
         return null;
@@ -694,9 +655,9 @@ public class CpRelationManager implements RelationManager {
 
 
     @Override
-    @Metered(group = "core", name = "RelationManager_createItemInCollection")
-    public Entity createItemInCollection(
-        String collName, String itemType, Map<String, Object> properties) throws Exception {
+    @Metered( group = "core", name = "RelationManager_createItemInCollection" )
+    public Entity createItemInCollection( String collName, String itemType, Map<String, Object> properties )
+            throws Exception {
 
         if ( headEntity.getUuid().equals( applicationId ) ) {
             if ( itemType.equals( TYPE_ENTITY ) ) {
@@ -714,14 +675,13 @@ public class CpRelationManager implements RelationManager {
             return em.create( itemType, properties );
         }
 
-        else if ( headEntity.getType().equals( Group.ENTITY_TYPE ) 
-                && ( collName.equals( COLLECTION_ROLES ) ) ) {
+        else if ( headEntity.getType().equals( Group.ENTITY_TYPE ) && ( collName.equals( COLLECTION_ROLES ) ) ) {
             UUID groupId = headEntity.getUuid();
             String roleName = ( String ) properties.get( PROPERTY_NAME );
             return em.createGroupRole( groupId, roleName, ( Long ) properties.get( PROPERTY_INACTIVITY ) );
         }
 
-        CollectionInfo collection = getDefaultSchema().getCollection(headEntity.getType(),collName);
+        CollectionInfo collection = getDefaultSchema().getCollection( headEntity.getType(), collName );
         if ( ( collection != null ) && !collection.getType().equals( itemType ) ) {
             return null;
         }
@@ -735,16 +695,16 @@ public class CpRelationManager implements RelationManager {
             addToCollection( collName, itemEntity );
 
             if ( collection != null && collection.getLinkedCollection() != null ) {
-                getRelationManager(  getHeadEntity() )
-                    .addToCollection( collection.getLinkedCollection(),itemEntity);
+                getRelationManager( getHeadEntity() ).addToCollection( collection.getLinkedCollection(), itemEntity );
             }
         }
 
-        return itemEntity;  
+        return itemEntity;
     }
 
+
     @Override
-    public void removeFromCollection(String collName, EntityRef itemRef) throws Exception {
+    public void removeFromCollection( String collName, EntityRef itemRef ) throws Exception {
 
         // special handling for roles collection of the application
         if ( headEntity.getUuid().equals( applicationId ) ) {
@@ -760,102 +720,86 @@ public class CpRelationManager implements RelationManager {
             }
             em.delete( itemRef );
             return;
-       }
+        }
 
         // load the entity to be removed to the collection
-        CollectionScope memberScope = new CollectionScopeImpl( 
-            this.applicationScope.getApplication(), 
-            this.applicationScope.getApplication(), 
-            CpNamingUtils.getCollectionScopeNameFromEntityType( itemRef.getType() ));
-        EntityCollectionManager memberMgr = managerCache.getEntityCollectionManager(memberScope);
+        CollectionScope memberScope =
+                new CollectionScopeImpl( this.applicationScope.getApplication(), this.applicationScope.getApplication(),
+                        CpNamingUtils.getCollectionScopeNameFromEntityType( itemRef.getType() ) );
+        EntityCollectionManager memberMgr = managerCache.getEntityCollectionManager( memberScope );
 
         if ( logger.isDebugEnabled() ) {
-            logger.debug("Loading entity to remove from collection "
-                    + "{}:{} from scope\n   app {}\n   owner {}\n   name {}", 
-                new Object[] { 
-                    itemRef.getType(), 
-                    itemRef.getUuid(), 
-                    memberScope.getApplication(), 
-                    memberScope.getOwner(), 
-                    memberScope.getName() 
-            });
+            logger.debug( "Loading entity to remove from collection "
+                            + "{}:{} from scope\n   app {}\n   owner {}\n   name {}", new Object[] {
+                            itemRef.getType(), itemRef.getUuid(), memberScope.getApplication(), memberScope.getOwner(),
+                            memberScope.getName()
+                    } );
         }
 
-        org.apache.usergrid.persistence.model.entity.Entity memberEntity = memberMgr.load(
-            new SimpleId( itemRef.getUuid(), itemRef.getType() )).toBlockingObservable().last();
+        org.apache.usergrid.persistence.model.entity.Entity memberEntity =
+                memberMgr.load( new SimpleId( itemRef.getUuid(), itemRef.getType() ) ).toBlockingObservable().last();
 
-        final EntityIndex ei = managerCache.getEntityIndex(applicationScope);
+        final EntityIndex ei = managerCache.getEntityIndex( applicationScope );
         final EntityIndexBatch batch = ei.createBatch();
 
         // remove item from collection index
-        IndexScope indexScope = new IndexScopeImpl(
-            cpHeadEntity.getId(), 
-            CpNamingUtils.getCollectionScopeNameFromCollectionName( collName ));
+        IndexScope indexScope = new IndexScopeImpl( cpHeadEntity.getId(),
+                CpNamingUtils.getCollectionScopeNameFromCollectionName( collName ) );
 
-        batch.deindex(indexScope,  memberEntity );
+        batch.deindex( indexScope, memberEntity );
 
         // remove collection from item index 
-        IndexScope itemScope = new IndexScopeImpl(
-            memberEntity.getId(), 
-            CpNamingUtils.getCollectionScopeNameFromCollectionName(
-                    Schema.defaultCollectionName( cpHeadEntity.getId().getType() ) ));
+        IndexScope itemScope = new IndexScopeImpl( memberEntity.getId(), CpNamingUtils
+                .getCollectionScopeNameFromCollectionName(
+                        Schema.defaultCollectionName( cpHeadEntity.getId().getType() ) ) );
 
 
-        batch.deindex(itemScope,  cpHeadEntity );
+        batch.deindex( itemScope, cpHeadEntity );
 
         batch.execute();
 
         // remove edge from collection to item 
-        GraphManager gm = managerCache.getGraphManager(applicationScope);
-        Edge collectionToItemEdge = new SimpleEdge( 
-            cpHeadEntity.getId(),
-             CpNamingUtils.getEdgeTypeFromCollectionName( collName),
-            memberEntity.getId(),
-                UUIDUtils.getUUIDLong(memberEntity.getId().getUuid())
-            );
-        gm.deleteEdge(collectionToItemEdge).toBlockingObservable().last();
+        GraphManager gm = managerCache.getGraphManager( applicationScope );
+        Edge collectionToItemEdge =
+                new SimpleEdge( cpHeadEntity.getId(), CpNamingUtils.getEdgeTypeFromCollectionName( collName ),
+                        memberEntity.getId(), UUIDUtils.getUUIDLong( memberEntity.getId().getUuid() ) );
+        gm.deleteEdge( collectionToItemEdge ).toBlockingObservable().last();
 
         // remove edge from item to collection
-        Edge itemToCollectionEdge = new SimpleEdge( 
-            memberEntity.getId(), 
-                CpNamingUtils
-                        .getEdgeTypeFromCollectionName( Schema.defaultCollectionName( cpHeadEntity.getId().getType() )),
-            cpHeadEntity.getId(),
-            UUIDUtils.getUUIDLong(cpHeadEntity.getId().getUuid()));
-        gm.deleteEdge(itemToCollectionEdge).toBlockingObservable().last();
+        Edge itemToCollectionEdge = new SimpleEdge( memberEntity.getId(), CpNamingUtils
+                .getEdgeTypeFromCollectionName( Schema.defaultCollectionName( cpHeadEntity.getId().getType() ) ),
+                cpHeadEntity.getId(), UUIDUtils.getUUIDLong( cpHeadEntity.getId().getUuid() ) );
+        gm.deleteEdge( itemToCollectionEdge ).toBlockingObservable().last();
 
         // special handling for roles collection of a group
         if ( headEntity.getType().equals( Group.ENTITY_TYPE ) ) {
 
             if ( collName.equals( COLLECTION_ROLES ) ) {
-                String path = (String)( (Entity)itemRef ).getMetadata( "path" );
+                String path = ( String ) ( ( Entity ) itemRef ).getMetadata( "path" );
 
                 if ( path.startsWith( "/roles/" ) ) {
 
-                    Entity itemEntity = em.get( new SimpleEntityRef( 
-                        memberEntity.getId().getType(), memberEntity.getId().getUuid() ) );
+                    Entity itemEntity = em.get( new SimpleEntityRef( memberEntity.getId().getType(),
+                            memberEntity.getId().getUuid() ) );
 
                     RoleRef roleRef = SimpleRoleRef.forRoleEntity( itemEntity );
                     em.deleteRole( roleRef.getApplicationRoleName() );
                 }
-
-            } 
+            }
         }
     }
 
 
     @Override
-    public void copyRelationships(String srcRelationName, EntityRef dstEntityRef, 
-            String dstRelationName) throws Exception {
+    public void copyRelationships( String srcRelationName, EntityRef dstEntityRef, String dstRelationName )
+            throws Exception {
 
         headEntity = em.validate( headEntity );
         dstEntityRef = em.validate( dstEntityRef );
 
-        CollectionInfo srcCollection = 
-                getDefaultSchema().getCollection( headEntity.getType(), srcRelationName );
+        CollectionInfo srcCollection = getDefaultSchema().getCollection( headEntity.getType(), srcRelationName );
 
-        CollectionInfo dstCollection = 
-                getDefaultSchema().getCollection( dstEntityRef.getType(), dstRelationName );
+        CollectionInfo dstCollection = getDefaultSchema().getCollection( dstEntityRef.getType(), dstRelationName );
 
         Results results = null;
         do {
@@ -881,8 +825,9 @@ public class CpRelationManager implements RelationManager {
         while ( ( results != null ) && ( results.hasMoreResults() ) );
     }
 
+
     @Override
-    public Results searchCollection(String collName, Query query) throws Exception {
+    public Results searchCollection( String collName, Query query ) throws Exception {
 
         if ( query == null ) {
             query = new Query();
@@ -890,24 +835,22 @@ public class CpRelationManager implements RelationManager {
 
         headEntity = em.validate( headEntity );
 
-        CollectionInfo collection = 
-            getDefaultSchema().getCollection( headEntity.getType(), collName );
+        CollectionInfo collection = getDefaultSchema().getCollection( headEntity.getType(), collName );
 
         if ( collection == null ) {
-            throw new RuntimeException("Cannot find collection-info for '"+collName
-                +"' of "+ headEntity.getType() +":"+headEntity.getUuid() );
+            throw new RuntimeException(
+                    "Cannot find collection-info for '" + collName + "' of " + headEntity.getType() + ":" + headEntity
+                            .getUuid() );
         }
 
-        IndexScope indexScope = new IndexScopeImpl(
-            cpHeadEntity.getId(), 
-            CpNamingUtils.getCollectionScopeNameFromCollectionName( collName ));
+        IndexScope indexScope = new IndexScopeImpl( cpHeadEntity.getId(),
+                CpNamingUtils.getCollectionScopeNameFromCollectionName( collName ) );
 
-        EntityIndex ei = managerCache.getEntityIndex(applicationScope);
-      
-        logger.debug("Searching scope {}:{}",
+        EntityIndex ei = managerCache.getEntityIndex( applicationScope );
 
-                indexScope.getOwner().toString(),
-                indexScope.getName() );
+        logger.debug( "Searching scope {}:{}",
+
+                indexScope.getOwner().toString(), indexScope.getName() );
 
         query.setEntityType( collection.getType() );
         query = adjustQuery( query );
@@ -929,22 +872,22 @@ public class CpRelationManager implements RelationManager {
             CandidateResults crs = ei.search( indexScope, query );
 
             if ( results == null ) {
-                logger.debug("Calling build results 1");
+                logger.debug( "Calling build results 1" );
                 results = buildResults( query, crs, collName );
-
-            } else {
-                logger.debug("Calling build results 2");
+            }
+            else {
+                logger.debug( "Calling build results 2" );
                 Results newResults = buildResults( query, crs, collName );
                 results.merge( newResults );
             }
 
             if ( crs.isEmpty() || !crs.hasCursor() ) { // no results, no cursor, can't get more
                 satisfied = true;
-
-            } else if ( results.size() == originalLimit )  { // got what we need
+            }
+            else if ( results.size() == originalLimit ) { // got what we need
                 satisfied = true;
-
-            } else if ( crs.hasCursor() ) {
+            }
+            else if ( crs.hasCursor() ) {
                 satisfied = false;
 
                 // need to query for more
@@ -952,10 +895,10 @@ public class CpRelationManager implements RelationManager {
                 query.setCursor( results.getCursor() );
                 query.setLimit( originalLimit - results.size() );
 
-                logger.warn("Satisfy query limit {}, new limit {} query count {}", new Object[] {
-                    originalLimit, query.getLimit(), queryCount 
-                });
-            } 
+                logger.warn( "Satisfy query limit {}, new limit {} query count {}", new Object[] {
+                        originalLimit, query.getLimit(), queryCount
+                } );
+            }
         }
 
         return results;
@@ -963,78 +906,60 @@ public class CpRelationManager implements RelationManager {
 
 
     @Override
-    @Metered(group = "core", name = "RelationManager_createConnection_connection_ref")
+    @Metered( group = "core", name = "RelationManager_createConnection_connection_ref" )
     public ConnectionRef createConnection( ConnectionRef connection ) throws Exception {
-        
+
         return createConnection( connection.getConnectionType(), connection.getConnectedEntity() );
     }
 
 
     @Override
-    @Metered(group = "core", name = "RelationManager_createConnection_connectionType")
-    public ConnectionRef createConnection( 
-            String connectionType, EntityRef connectedEntityRef ) throws Exception {
+    @Metered( group = "core", name = "RelationManager_createConnection_connectionType" )
+    public ConnectionRef createConnection( String connectionType, EntityRef connectedEntityRef ) throws Exception {
 
         headEntity = em.validate( headEntity );
         connectedEntityRef = em.validate( connectedEntityRef );
 
-        ConnectionRefImpl connection = new ConnectionRefImpl( 
-            headEntity, connectionType, connectedEntityRef );
+        ConnectionRefImpl connection = new ConnectionRefImpl( headEntity, connectionType, connectedEntityRef );
 
-        CollectionScope targetScope = new CollectionScopeImpl( 
-            applicationScope.getApplication(), 
-            applicationScope.getApplication(), 
-            CpNamingUtils.getCollectionScopeNameFromEntityType( connectedEntityRef.getType() ));
+        CollectionScope targetScope =
+                new CollectionScopeImpl( applicationScope.getApplication(), applicationScope.getApplication(),
+                        CpNamingUtils.getCollectionScopeNameFromEntityType( connectedEntityRef.getType() ) );
 
-        EntityCollectionManager targetEcm = managerCache.getEntityCollectionManager(targetScope);
+        EntityCollectionManager targetEcm = managerCache.getEntityCollectionManager( targetScope );
 
         if ( logger.isDebugEnabled() ) {
-            logger.debug("createConnection(): "
-                    + "Indexing connection type '{}'\n   from source {}:{}]\n"
-                    + "   to target {}:{}\n   from scope\n   app {}\n   owner {}\n   name {}", 
-                new Object[] { 
-                    connectionType,
-                    headEntity.getType(), 
-                    headEntity.getUuid(), 
-                    connectedEntityRef.getType(), 
-                    connectedEntityRef.getUuid(), 
-                    targetScope.getApplication(), 
-                    targetScope.getOwner(), 
-                    targetScope.getName() 
-            });
-        }
-
-        org.apache.usergrid.persistence.model.entity.Entity targetEntity = targetEcm.load(
-            new SimpleId( connectedEntityRef.getUuid(), connectedEntityRef.getType() ))
-                .toBlockingObservable().last();
-
-        String edgeType = CpNamingUtils
-                .getEdgeTypeFromConnectionType( connectionType );
+            logger.debug( "createConnection(): " + "Indexing connection type '{}'\n   from source {}:{}]\n"
+                            + "   to target {}:{}\n   from scope\n   app {}\n   owner {}\n   name {}", new Object[] {
+                            connectionType, headEntity.getType(), headEntity.getUuid(), connectedEntityRef.getType(),
+                            connectedEntityRef.getUuid(), targetScope.getApplication(), targetScope.getOwner(),
+                            targetScope.getName()
+                    } );
+        }
+
+        org.apache.usergrid.persistence.model.entity.Entity targetEntity =
+                targetEcm.load( new SimpleId( connectedEntityRef.getUuid(), connectedEntityRef.getType() ) )
+                         .toBlockingObservable().last();
+
+        String edgeType = CpNamingUtils.getEdgeTypeFromConnectionType( connectionType );
 
         // create graph edge connection from head entity to member entity
-        Edge edge = new SimpleEdge( 
-            cpHeadEntity.getId(), 
-            edgeType,
-            targetEntity.getId(), 
-            System.currentTimeMillis() );
-        GraphManager gm = managerCache.getGraphManager(applicationScope);
-        gm.writeEdge(edge).toBlockingObservable().last();
-
-        EntityIndex ei = managerCache.getEntityIndex(applicationScope);
+        Edge edge = new SimpleEdge( cpHeadEntity.getId(), edgeType, targetEntity.getId(), System.currentTimeMillis() );
+        GraphManager gm = managerCache.getGraphManager( applicationScope );
+        gm.writeEdge( edge ).toBlockingObservable().last();
+
+        EntityIndex ei = managerCache.getEntityIndex( applicationScope );
         EntityIndexBatch batch = ei.createBatch();
 
         // Index the new connection in app|source|type context
-        IndexScope indexScope = new IndexScopeImpl(
-            cpHeadEntity.getId(), 
-            CpNamingUtils.getConnectionScopeName( connectedEntityRef.getType(), connectionType ));
+        IndexScope indexScope = new IndexScopeImpl( cpHeadEntity.getId(),
+                CpNamingUtils.getConnectionScopeName( connectedEntityRef.getType(), connectionType ) );
 
-        batch.index(indexScope, targetEntity );
+        batch.index( indexScope, targetEntity );
 
         // Index the new connection in app|scope|all-types context
-        IndexScope allTypesIndexScope = new IndexScopeImpl(
-            cpHeadEntity.getId(), 
-                CpNamingUtils.ALL_TYPES);
-        batch.index(allTypesIndexScope,  targetEntity );
+        IndexScope allTypesIndexScope = new IndexScopeImpl( cpHeadEntity.getId(), CpNamingUtils.ALL_TYPES );
+        batch.index( allTypesIndexScope, targetEntity );
 
 
         batch.execute();
@@ -1047,16 +972,17 @@ public class CpRelationManager implements RelationManager {
         return connection;
     }
 
-    
-    @SuppressWarnings("unchecked")
-    @Metered(group = "core", name = "CpRelationManager_batchUpdateEntityConnection")
-    public Mutator<ByteBuffer> batchUpdateEntityConnection( Mutator<ByteBuffer> batch, 
-        boolean disconnect, ConnectionRefImpl connection, UUID timestampUuid ) throws Exception {
+
+    @SuppressWarnings( "unchecked" )
+    @Metered( group = "core", name = "CpRelationManager_batchUpdateEntityConnection" )
+    public Mutator<ByteBuffer> batchUpdateEntityConnection( Mutator<ByteBuffer> batch, boolean disconnect,
+                                                            ConnectionRefImpl connection, UUID timestampUuid )
+            throws Exception {
 
         long timestamp = getTimestampInMicros( timestampUuid );
 
-        Entity connectedEntity = em.get( new SimpleEntityRef( 
-                connection.getConnectedEntityType(), connection.getConnectedEntityId()) );
+        Entity connectedEntity =
+                em.get( new SimpleEntityRef( connection.getConnectedEntityType(), connection.getConnectedEntityId() ) );
 
         if ( connectedEntity == null ) {
             return batch;
@@ -1065,64 +991,60 @@ public class CpRelationManager implements RelationManager {
         // Create connection for requested params
 
         if ( disconnect ) {
-            
+
             addDeleteToMutator( batch, ENTITY_COMPOSITE_DICTIONARIES,
-                key( connection.getConnectingEntityId(), DICTIONARY_CONNECTED_ENTITIES,
-                    connection.getConnectionType() ),
-                asList( connection.getConnectedEntityId(), 
-                        connection.getConnectedEntityType() ), timestamp );
+                    key( connection.getConnectingEntityId(), DICTIONARY_CONNECTED_ENTITIES,
+                            connection.getConnectionType() ),
+                    asList( connection.getConnectedEntityId(), connection.getConnectedEntityType() ), timestamp );
 
             addDeleteToMutator( batch, ENTITY_COMPOSITE_DICTIONARIES,
-                key( connection.getConnectedEntityId(), DICTIONARY_CONNECTING_ENTITIES,
-                    connection.getConnectionType() ),
-                asList( connection.getConnectingEntityId(), 
-                        connection.getConnectingEntityType() ), timestamp );
+                    key( connection.getConnectedEntityId(), DICTIONARY_CONNECTING_ENTITIES,
+                            connection.getConnectionType() ),
+                    asList( connection.getConnectingEntityId(), connection.getConnectingEntityType() ), timestamp );
 
             // delete the connection path if there will be no connections left
 
             // check out outbound edges of the given type.  If we have more than the 1 specified,
             // we shouldn't delete the connection types from our outbound index
-            if ( !moreThanOneOutboundConnection( 
-                connection.getConnectingEntity(), connection.getConnectionType() ) ) {
+            if ( !moreThanOneOutboundConnection( connection.getConnectingEntity(), connection.getConnectionType() ) ) {
 
                 addDeleteToMutator( batch, ENTITY_DICTIONARIES,
-                    key( connection.getConnectingEntityId(), DICTIONARY_CONNECTED_TYPES ),
-                    connection.getConnectionType(), timestamp );
+                        key( connection.getConnectingEntityId(), DICTIONARY_CONNECTED_TYPES ),
+                        connection.getConnectionType(), timestamp );
             }
 
             //check out inbound edges of the given type.  If we have more than the 1 specified,
             // we shouldn't delete the connection types from our outbound index
-            if ( !moreThanOneInboundConnection( 
-               connection.getConnectingEntity(), connection.getConnectionType() ) ) {
+            if ( !moreThanOneInboundConnection( connection.getConnectingEntity(), connection.getConnectionType() ) ) {
 
                 addDeleteToMutator( batch, ENTITY_DICTIONARIES,
                         key( connection.getConnectedEntityId(), DICTIONARY_CONNECTING_TYPES ),
                         connection.getConnectionType(), timestamp );
             }
-
-        } else {
+        }
+        else {
 
             addInsertToMutator( batch, ENTITY_COMPOSITE_DICTIONARIES,
-                key( connection.getConnectingEntityId(), DICTIONARY_CONNECTED_ENTITIES,
-                    connection.getConnectionType() ),
-                asList( connection.getConnectedEntityId(), connection.getConnectedEntityType() ), 
-                    timestamp, timestamp );
+                    key( connection.getConnectingEntityId(), DICTIONARY_CONNECTED_ENTITIES,
+                            connection.getConnectionType() ),
+                    asList( connection.getConnectedEntityId(), connection.getConnectedEntityType() ), timestamp,
+                    timestamp );
 
             addInsertToMutator( batch, ENTITY_COMPOSITE_DICTIONARIES,
-                key( connection.getConnectedEntityId(), DICTIONARY_CONNECTING_ENTITIES,
-                    connection.getConnectionType() ),
-                asList( connection.getConnectingEntityId(), connection.getConnectingEntityType() ), 
-                    timestamp, timestamp );
+                    key( connection.getConnectedEntityId(), DICTIONARY_CONNECTING_ENTITIES,
+                            connection.getConnectionType() ),
+                    asList( connection.getConnectingEntityId(), connection.getConnectingEntityType() ), timestamp,
+                    timestamp );
 
             // Add connection type to connections set
             addInsertToMutator( batch, ENTITY_DICTIONARIES,
-                key( connection.getConnectingEntityId(), DICTIONARY_CONNECTED_TYPES ),
-                connection.getConnectionType(), null, timestamp );
+                    key( connection.getConnectingEntityId(), DICTIONARY_CONNECTED_TYPES ),
+                    connection.getConnectionType(), null, timestamp );
 
             // Add connection type to connections set
             addInsertToMutator( batch, ENTITY_DICTIONARIES,
-                key( connection.getConnectedEntityId(), DICTIONARY_CONNECTING_TYPES ),
-                connection.getConnectionType(), null, timestamp );
+                    key( connection.getConnectedEntityId(), DICTIONARY_CONNECTING_TYPES ),
+                    connection.getConnectionType(), null, timestamp );
         }
 
         // Add indexes for the connected entity's list properties
@@ -1137,15 +1059,15 @@ public class CpRelationManager implements RelationManager {
 
         for ( String dictionaryName : dictionaryNames ) {
             boolean has_dictionary = schema.hasDictionary( connectedEntity.getType(), dictionaryName );
-            boolean dictionary_indexed = schema.isDictionaryIndexedInConnections( 
-                connectedEntity.getType(), dictionaryName );
+            boolean dictionary_indexed =
+                    schema.isDictionaryIndexedInConnections( connectedEntity.getType(), dictionaryName );
 
             if ( dictionary_indexed || !has_dictionary ) {
                 Set<Object> elementValues = em.getDictionaryAsSet( connectedEntity, dictionaryName );
                 for ( Object elementValue : elementValues ) {
                     IndexUpdate indexUpdate =
-                        batchStartIndexUpdate( batch, connectedEntity, dictionaryName, 
-                            elementValue, timestampUuid, has_dictionary, true, disconnect, false );
+                            batchStartIndexUpdate( batch, connectedEntity, dictionaryName, elementValue, timestampUuid,
+                                    has_dictionary, true, disconnect, false );
                     batchUpdateConnectionIndex( indexUpdate, connection );
                 }
             }
@@ -1156,127 +1078,118 @@ public class CpRelationManager implements RelationManager {
 
 
     @Override
-    @Metered(group = "core", name = "RelationManager_createConnection_paired_connection_type")
-    public ConnectionRef createConnection( 
-            String pairedConnectionType, EntityRef pairedEntity, String connectionType,
-            EntityRef connectedEntityRef ) throws Exception {
-        
-        throw new UnsupportedOperationException("Paired connections not supported"); 
+    @Metered( group = "core", name = "RelationManager_createConnection_paired_connection_type" )
+    public ConnectionRef createConnection( String pairedConnectionType, EntityRef pairedEntity, String connectionType,
+                                           EntityRef connectedEntityRef ) throws Exception {
+
+        throw new UnsupportedOperationException( "Paired connections not supported" );
     }
 
 
     @Override
-    @Metered(group = "core", name = "RelationManager_createConnection_connected_entity_ref")
+    @Metered( group = "core", name = "RelationManager_createConnection_connected_entity_ref" )
     public ConnectionRef createConnection( ConnectedEntityRef... connections ) throws Exception {
 
-        throw new UnsupportedOperationException("Paired connections not supported"); 
+        throw new UnsupportedOperationException( "Paired connections not supported" );
     }
 
+
     @Override
-    public ConnectionRef connectionRef(
-            String connectionType, 
-            EntityRef connectedEntityRef) throws Exception {
+    public ConnectionRef connectionRef( String connectionType, EntityRef connectedEntityRef ) throws Exception {
 
-        ConnectionRef connection = new ConnectionRefImpl( 
-                headEntity, connectionType, connectedEntityRef );
+        ConnectionRef connection = new ConnectionRefImpl( headEntity, connectionType, connectedEntityRef );
 
         return connection;
     }
 
+
     @Override
-    public ConnectionRef connectionRef(String pairedConnectionType, EntityRef pairedEntity, 
-            String connectionType, EntityRef connectedEntityRef) throws Exception {
+    public ConnectionRef connectionRef( String pairedConnectionType, EntityRef pairedEntity, String connectionType,
+                                        EntityRef connectedEntityRef ) throws Exception {
 
-        throw new UnsupportedOperationException("Paired connections not supported"); 
+        throw new UnsupportedOperationException( "Paired connections not supported" );
     }
 
+
     @Override
-    public ConnectionRef connectionRef(ConnectedEntityRef... connections) {
+    public ConnectionRef connectionRef( ConnectedEntityRef... connections ) {
 
-        throw new UnsupportedOperationException("Paired connections not supported"); 
+        throw new UnsupportedOperationException( "Paired connections not supported" );
     }
 
+
     @Override
-    public void deleteConnection(ConnectionRef connectionRef) throws Exception {
-       
+    public void deleteConnection( ConnectionRef connectionRef ) throws Exception {
+
         // First, clean up the dictionary records of the connection
         Keyspace ko = cass.getApplicationKeyspace( applicationId );
         Mutator<ByteBuffer> m = createMutator( ko, be );
-        batchUpdateEntityConnection( 
-            m, true, (ConnectionRefImpl)connectionRef, UUIDGenerator.newTimeUUID() );
+        batchUpdateEntityConnection( m, true, ( ConnectionRefImpl ) connectionRef, UUIDGenerator.newTimeUUID() );
         batchExecute( m, CassandraService.RETRY_COUNT );
 
         EntityRef connectingEntityRef = connectionRef.getConnectingEntity();  // source
         EntityRef connectedEntityRef = connectionRef.getConnectedEntity();  // target
 
         String connectionType = connectionRef.getConnectedEntity().getConnectionType();
-        
-        CollectionScope targetScope = new CollectionScopeImpl( 
-            applicationScope.getApplication(), 
-            applicationScope.getApplication(), 
-            CpNamingUtils.getCollectionScopeNameFromEntityType( connectedEntityRef.getType() ));
 
-        EntityCollectionManager targetEcm = managerCache.getEntityCollectionManager(targetScope);
+        CollectionScope targetScope =
+                new CollectionScopeImpl( applicationScope.getApplication(), applicationScope.getApplication(),
+                        CpNamingUtils.getCollectionScopeNameFromEntityType( connectedEntityRef.getType() ) );
+
+        EntityCollectionManager targetEcm = managerCache.getEntityCollectionManager( targetScope );
 
         if ( logger.isDebugEnabled() ) {
-            logger.debug("Deleting connection '{}' from source {}:{} \n   to target {}:{}",
-                new Object[] { 
-                    connectionType,
-                    connectingEntityRef.getType(), 
-                    connectingEntityRef.getUuid(), 
-                    connectedEntityRef.getType(), 
-                    connectedEntityRef.getUuid()
-            });
+            logger.debug( "Deleting connection '{}' from source {}:{} \n   to target {}:{}", new Object[] {
+                            connectionType, connectingEntityRef.getType(), connectingEntityRef.getUuid(),
+                            connectedEntityRef.getType(), connectedEntityRef.getUuid()
+                    } );
         }
 
-        org.apache.usergrid.persistence.model.entity.Entity targetEntity = targetEcm.load(
-            new SimpleId( connectedEntityRef.getUuid(), connectedEntityRef.getType() ))
-                .toBlockingObservable().last();
+        org.apache.usergrid.persistence.model.entity.Entity targetEntity =
+                targetEcm.load( new SimpleId( connectedEntityRef.getUuid(), connectedEntityRef.getType() ) )
+                         .toBlockingObservable().last();
 
         // Delete graph edge connection from head entity to member entity
-        Edge edge = new SimpleEdge( 
-            new SimpleId( connectingEntityRef.getUuid(), connectingEntityRef.getType() ),
-            connectionType,
-            targetEntity.getId(), 
-            System.currentTimeMillis() );
-        GraphManager gm = managerCache.getGraphManager(applicationScope);
-        gm.deleteEdge(edge).toBlockingObservable().last();
-
-        final EntityIndex ei = managerCache.getEntityIndex( applicationScope )  ;
+        Edge edge = new SimpleEdge( new SimpleId( connectingEntityRef.getUuid(), connectingEntityRef.getType() ),
+                connectionType, targetEntity.getId(), System.currentTimeMillis() );
+        GraphManager gm = managerCache.getGraphManager( applicationScope );
+        gm.deleteEdge( edge ).toBlockingObservable().last();
+
+        final EntityIndex ei = managerCache.getEntityIndex( applicationScope );
         final EntityIndexBatch batch = ei.createBatch();
 
         // Deindex the connection in app|source|type context
-        IndexScope indexScope = new IndexScopeImpl(
-            new SimpleId( connectingEntityRef.getUuid(), connectingEntityRef.getType() ),
-            CpNamingUtils.getConnectionScopeName( targetEntity.getId().getType(), connectionType ));
-        batch.deindex( indexScope , targetEntity );
+        IndexScope indexScope =
+                new IndexScopeImpl( new SimpleId( connectingEntityRef.getUuid(), connectingEntityRef.getType() ),
+                        CpNamingUtils.getConnectionScopeName( targetEntity.getId().getType(), connectionType ) );
+        batch.deindex( indexScope, targetEntity );
 
         // Deindex the connection in app|source|type context
-        IndexScope allTypesIndexScope = new IndexScopeImpl(
-            new SimpleId( connectingEntityRef.getUuid(), connectingEntityRef.getType() ),
-                CpNamingUtils.ALL_TYPES);
+        IndexScope allTypesIndexScope =
+                new IndexScopeImpl( new SimpleId( connectingEntityRef.getUuid(), connectingEntityRef.getType() ),
+                        CpNamingUtils.ALL_TYPES );
 
-        batch.deindex( allTypesIndexScope,  targetEntity );
+        batch.deindex( allTypesIndexScope, targetEntity );
 
         batch.execute();
-
     }
 
 
     @Override
-    public Set<String> getConnectionTypes(UUID connectedEntityId) throws Exception {
-        throw new UnsupportedOperationException("Cannot specify entity by UUID alone."); 
+    public Set<String> getConnectionTypes( UUID connectedEntityId ) throws Exception {
+        throw new UnsupportedOperationException( "Cannot specify entity by UUID alone." );
     }
 
+
     @Override
     public Set<String> getConnectionTypes() throws Exception {
         return getConnectionTypes( false );
     }
 
+
     @Override
-    public Set<String> getConnectionTypes(boolean filterConnection) throws Exception {
-        Set<String> connections = cast( 
-                em.getDictionaryAsSet( headEntity, Schema.DICTIONARY_CONNECTED_TYPES ) );
+    public Set<String> getConnectionTypes( boolean filterConnection ) throws Exception {
+        Set<String> connections = cast( em.getDictionaryAsSet( headEntity, Schema.DICTIONARY_CONNECTED_TYPES ) );
 
         if ( connections == null ) {
             return null;
@@ -1289,47 +1202,45 @@ public class CpRelationManager implements RelationManager {
 
 
     @Override
-    public Results getConnectedEntities( 
-        String connectionType, String connectedEntityType, Level level) throws Exception {
+    public Results getConnectedEntities( String connectionType, String connectedEntityType, Level level )
+            throws Exception {
 
         Results raw = null;
 
         Query query = new Query();
-        query.setConnectionType(connectionType);
-        query.setEntityType(connectedEntityType);
+        query.setConnectionType( connectionType );
+        query.setEntityType( connectedEntityType );
 
         if ( connectionType == null ) {
             raw = searchConnectedEntities( query );
-
-        } else {
+        }
+        else {
 
             headEntity = em.validate( headEntity );
 
             String scopeName = null;
             if ( connectedEntityType != null ) {
                 scopeName = CpNamingUtils.getConnectionScopeName( connectedEntityType, connectionType );
-            } else {
+            }
+            else {
                 scopeName = CpNamingUtils.ALL_TYPES;
             }
 
-            IndexScope indexScope = new IndexScopeImpl(
-                cpHeadEntity.getId(), 
-                scopeName);
+            IndexScope indexScope = new IndexScopeImpl( cpHeadEntity.getId(), scopeName );
 
-            final EntityIndex ei = managerCache.getEntityIndex(applicationScope);
+            final EntityIndex ei = managerCache.getEntityIndex( applicationScope );
 
-        
-            logger.debug("Searching connected entities from scope {}:{}",
-                indexScope.getOwner().toString(),
-                indexScope.getName());
+
+            logger.debug( "Searching connected entities from scope {}:{}", indexScope.getOwner().toString(),
+                    indexScope.getName() );
 
             query = adjustQuery( query );
             CandidateResults crs = ei.search( indexScope, query );
 
-            raw = buildResults( query , crs, query.getConnectionType() );
+            raw = buildResults( query, crs, query.getConnectionType() );
         }
 
-        if ( Level.ALL_PROPERTIES.equals(level ) ) {
+        if ( Level.ALL_PROPERTIES.equals( level ) ) {
             List<Entity> entities = new ArrayList<Entity>();
             for ( EntityRef ref : raw.getEntities() ) {
                 Entity entity = em.get( ref );
@@ -1349,29 +1260,28 @@ public class CpRelationManager implements RelationManager {
 
 
     @Override
-    public Results getConnectingEntities(
-            String connType, String fromEntityType, Level resultsLevel) throws Exception {
+    public Results getConnectingEntities( String connType, String fromEntityType, Level resultsLevel )
+            throws Exception {
 
         return getConnectingEntities( connType, fromEntityType, resultsLevel, -1 );
     }
 
+
     @Override
-    public Results getConnectingEntities(
-            String connType, String fromEntityType, Level level, int count) throws Exception {
+    public Results getConnectingEntities( String connType, String fromEntityType, Level level, int count )
+            throws Exception {
 
         // looking for edges to the head entity
-        String edgeType = 
-                CpNamingUtils.getEdgeTypeFromConnectionType( connType );
+        String edgeType = CpNamingUtils.getEdgeTypeFromConnectionType( connType );
 
-        Map<EntityRef, Set<String>> containers = 
-            getContainers( count, edgeType, fromEntityType );
+        Map<EntityRef, Set<String>> containers = getContainers( count, edgeType, fromEntityType );
 
-        if ( Level.REFS.equals(level ) ) {
+        if ( Level.REFS.equals( level ) ) {
             List<EntityRef> refList = new ArrayList<EntityRef>( containers.keySet() );
             return Results.fromRefList( refList );
-        } 
+        }
 
-        if ( Level.IDS.equals(level ) ) {
+        if ( Level.IDS.equals( level ) ) {
             // TODO: someday this should return a list of Core Persistence Ids
             List<UUID> idList = new ArrayList<UUID>();
             for ( EntityRef ref : containers.keySet() ) {
@@ -1383,7 +1293,7 @@ public class CpRelationManager implements RelationManager {
         List<Entity> entities = new ArrayList<Entity>();
         for ( EntityRef ref : containers.keySet() ) {
             Entity entity = em.get( ref );
-            logger.debug("   Found connecting entity: " + entity.getProperties());
+            logger.debug( "   Found connecting entity: " + entity.getProperties() );
             entities.add( entity );
         }
         return Results.fromEntities( entities );
@@ -1402,35 +1312,30 @@ public class CpRelationManager implements RelationManager {
         if ( query.getEntityType() == null ) {
 
             // search across all types of collections of the head-entity
-            IndexScope indexScope = new IndexScopeImpl(
-                cpHeadEntity.getId(), 
-                    CpNamingUtils.ALL_TYPES);
+            IndexScope indexScope = new IndexScopeImpl( cpHeadEntity.getId(), CpNamingUtils.ALL_TYPES );
+
+            EntityIndex ei = managerCache.getEntityIndex( applicationScope );
 
-            EntityIndex ei = managerCache.getEntityIndex(applicationScope);
-        
-            logger.debug("Searching connections from the all-types scope {}:{}",
-                indexScope.getOwner().toString(),
-                indexScope.getName());
+            logger.debug( "Searching connections from the all-types scope {}:{}", indexScope.getOwner().toString(),
+                    indexScope.getName() );
 
             query = adjustQuery( query );
-            CandidateResults crs = ei.search(indexScope,  query );
+            CandidateResults crs = ei.search( indexScope, query );
 
-            return buildConnectionResults(query , crs, query.getConnectionType() );
+            return buildConnectionResults( query, crs, query.getConnectionType() );
         }
 
-        IndexScope indexScope = new IndexScopeImpl(
-            cpHeadEntity.getId(), 
-            CpNamingUtils.getConnectionScopeName( query.getEntityType(), query.getConnectionType() ));
-        EntityIndex ei = managerCache.getEntityIndex(applicationScope);
-    
-        logger.debug("Searching connections from the scope {}:{}",
-            indexScope.getOwner().toString(),
-            indexScope.getName());
+        IndexScope indexScope = new IndexScopeImpl( cpHeadEntity.getId(),
+                CpNamingUtils.getConnectionScopeName( query.getEntityType(), query.getConnectionType() ) );
+        EntityIndex ei = managerCache.getEntityIndex( applicationScope );
+
+        logger.debug( "Searching connections from the scope {}:{}", indexScope.getOwner().toString(),
+                indexScope.getName() );
 
         query = adjustQuery( query );
         CandidateResults crs = ei.search( indexScope, query );
 
-        return buildConnectionResults(query , crs, query.getConnectionType() );
+        return buildConnectionResults( query, crs, query.getConnectionType() );
     }
 
 
@@ -1448,47 +1353,44 @@ public class CpRelationManager implements RelationManager {
                 // This is fulgy to put here, but required.
                 if ( query.getEntityType().equals( User.ENTITY_TYPE ) && ident.isEmail() ) {
 
-                    Query newQuery = Query.fromQL(
-                        "select * where email='" + query.getSingleNameOrEmailIdentifier()+ "'");
+                    Query newQuery =
+                            Query.fromQL( "select * where email='" + query.getSingleNameOrEmailIdentifier() + "'" );
                     query.setRootOperand( newQuery.getRootOperand() );
                 }
 
                 // use the ident with the default alias. could be an email
                 else {
 
-                    Query newQuery = Query.fromQL(
-                        "select * where name='" + query.getSingleNameOrEmailIdentifier()+ "'");
+                    Query newQuery =
+                            Query.fromQL( "select * where name='" + query.getSingleNameOrEmailIdentifier() + "'" );
                     query.setRootOperand( newQuery.getRootOperand() );
                 }
+            }
+            else if ( query.containsSingleUuidIdentifier() ) {
 
-            } else if ( query.containsSingleUuidIdentifier() ) {
-
-                Query newQuery = Query.fromQL(
-                        "select * where uuid='" + query.getSingleUuidIdentifier() + "'");
+                Query newQuery = Query.fromQL( "select * where uuid='" + query.getSingleUuidIdentifier() + "'" );
                 query.setRootOperand( newQuery.getRootOperand() );
             }
         }
 
         if ( query.isReversed() ) {
 
-            Query.SortPredicate desc = new Query.SortPredicate( 
-                PROPERTY_CREATED, Query.SortDirection.DESCENDING );
+    

<TRUNCATED>

[09/13] git commit: Added logging

Posted by to...@apache.org.
Added logging


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/83200bf5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/83200bf5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/83200bf5

Branch: refs/heads/cloudformation-update
Commit: 83200bf52f276c89f4351cf6fc8988414646507a
Parents: c31c553
Author: Todd Nine <to...@apache.org>
Authored: Thu Oct 16 19:29:01 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Thu Oct 16 19:30:08 2014 -0600

----------------------------------------------------------------------
 .../usergrid/persistence/index/impl/EsEntityIndexImpl.java      | 5 ++++-
 .../services/notifications/AbstractServiceNotificationIT.java   | 4 ----
 2 files changed, 4 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/83200bf5/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index 821f7b9..b562d8a 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@ -88,7 +88,9 @@ public class EsEntityIndexImpl implements EntityIndex {
 
     private final IndexFig config;
 
-    private static final int MAX_WAITS = 10;
+    //number of times to wait for the index to refresh propertly. Is an N+1, so 9 = 10
+    private static final int MAX_WAITS = 9;
+    //number of milliseconds to try again before sleeping
     private static final int WAIT_TIME = 250;
 
 
@@ -280,6 +282,7 @@ public class EsEntityIndexImpl implements EntityIndex {
         for ( int i = 0; i < MAX_WAITS; i++ ) {
             try {
                 client.admin().indices().prepareRefresh( indexName ).execute().actionGet();
+                log.debug( "Refreshed index: " + indexName );
                 return;
             }
             catch ( IndexMissingException e ) {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/83200bf5/stack/services/src/test/java/org/apache/usergrid/services/notifications/AbstractServiceNotificationIT.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/services/notifications/AbstractServiceNotificationIT.java b/stack/services/src/test/java/org/apache/usergrid/services/notifications/AbstractServiceNotificationIT.java
index e16f111..a9ca955 100644
--- a/stack/services/src/test/java/org/apache/usergrid/services/notifications/AbstractServiceNotificationIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/services/notifications/AbstractServiceNotificationIT.java
@@ -41,11 +41,7 @@ import static org.junit.Assert.fail;
 
 public class AbstractServiceNotificationIT extends AbstractServiceIT {
     private NotificationsService ns;
-    @Autowired
-    private ServiceManagerFactory smf;
 
-    @Autowired
-    private EntityManagerFactory emf;
     @Rule
     public TestName name = new TestName();
 


[04/13] git commit: Changed refresh logic to catch an issue when the Index has not yet replicated across the cluster.

Posted by to...@apache.org.
Changed refresh logic to catch an issue when the Index has not yet replicated across the cluster.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/e5efa674
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/e5efa674
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/e5efa674

Branch: refs/heads/cloudformation-update
Commit: e5efa674bf71cda99d9958186962b0691ac4d6c2
Parents: 54b9a11
Author: Todd Nine <to...@apache.org>
Authored: Thu Oct 16 15:30:54 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Thu Oct 16 15:30:54 2014 -0600

----------------------------------------------------------------------
 .../index/impl/EsEntityIndexImpl.java           | 55 ++++++++++----------
 1 file changed, 27 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e5efa674/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index 495b9e1..6ed9b86 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@ -124,34 +124,14 @@ public class EsEntityIndexImpl implements EntityIndex {
             }
             while ( response.getFailedShards() != 0 );
 
-            //now try to refresh, to ensure that it's recognized by everyone.  Occasionally we can get a success
-            //before we can write.
-            for(int i = 0 ; i < MAX_WAITS; i++ ){
-                try{
-                    refresh();
-                    break;
-
-                }catch(Exception e){
-                   log.error( "Unable to refresh index after create. Waiting before sleeping.", e );
-                }
+            /**
+             * Immediately refresh to ensure the entire cluster is ready to receive this write.  Occasionally we see
+             * errors.  See this post.
+             * http://elasticsearch-users.115913.n3.nabble.com/IndexMissingException-on-create-index-followed-by-refresh-td1832793.html
+             *
+             */
+            refresh();
 
-                try {
-                    Thread.sleep( WAIT_TIME );
-                }
-                catch ( InterruptedException e ) {
-                    //swallow it
-                }
-            }
-
-            //
-            //            response.getFailedShards();
-            //
-            //            try {
-            //                // TODO: figure out what refresh above is not enough to ensure index is ready
-            //                Thread.sleep( 500 );
-            //            }
-            //            catch ( InterruptedException ex ) {
-            //            }
         }
         catch ( IndexAlreadyExistsException expected ) {
             // this is expected to happen if index already exists, it's a no-op and swallow
@@ -290,7 +270,26 @@ public class EsEntityIndexImpl implements EntityIndex {
 
 
     public void refresh() {
-        client.admin().indices().prepareRefresh( indexName ).execute().actionGet();
+
+            //now try to refresh, to ensure that it's recognized by everyone.  Occasionally we can get a success
+            //before we can write.
+            for(int i = 0 ; i < MAX_WAITS; i++ ){
+                try{
+                    client.admin().indices().prepareRefresh( indexName ).execute().actionGet();
+                    break;
+
+                }catch(Exception e){
+                   log.error( "Unable to refresh index after create. Waiting before sleeping.", e );
+                }
+
+                try {
+                    Thread.sleep( WAIT_TIME );
+                }
+                catch ( InterruptedException e ) {
+                    //swallow it
+                }
+            }
+
         log.debug( "Refreshed index: " + indexName );
     }
 


[11/13] git commit: Explicitly made node a client

Posted by to...@apache.org.
Explicitly made node a client


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/bd00ee00
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/bd00ee00
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/bd00ee00

Branch: refs/heads/cloudformation-update
Commit: bd00ee0012352b5cadecedbbccd92b8c5f2622dd
Parents: 722c43b
Author: Todd Nine <to...@apache.org>
Authored: Thu Oct 16 21:48:57 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Thu Oct 16 21:48:57 2014 -0600

----------------------------------------------------------------------
 .../java/org/apache/usergrid/persistence/index/impl/EsProvider.java | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bd00ee00/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsProvider.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsProvider.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsProvider.java
index a9228ee..aac9ff5 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsProvider.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsProvider.java
@@ -163,6 +163,7 @@ public class EsProvider {
                     .put("client.transport.ping_timeout", 2000) // milliseconds
                     .put("client.transport.nodes_sampler_interval", 100)
                     .put("network.tcp.blocking", true)
+                    .put("node.client", true)
                     .put("node.name",  nodeName )
 
                     .build();


[13/13] git commit: Merge branch 'index-rebuild' into cloudformation-update

Posted by to...@apache.org.
Merge branch 'index-rebuild' into cloudformation-update


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/059a9523
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/059a9523
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/059a9523

Branch: refs/heads/cloudformation-update
Commit: 059a9523360eb543c3adfc42059a4c9e52e4efbb
Parents: 704c09d 8d9d4dc
Author: Todd Nine <to...@apache.org>
Authored: Fri Oct 17 08:57:14 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Fri Oct 17 08:57:14 2014 -0600

----------------------------------------------------------------------
 .../corepersistence/CpEntityManager.java        |  107 +-
 .../corepersistence/CpEntityManagerFactory.java |   40 +-
 .../corepersistence/CpRelationManager.java      | 1427 ++++++++----------
 .../usergrid/corepersistence/CpVisitor.java     |   11 +-
 .../usergrid/corepersistence/CpWalker.java      |  157 +-
 .../usergrid/persistence/EntityManager.java     |    2 +-
 .../persistence/EntityManagerFactory.java       |    8 +-
 .../org/apache/usergrid/CoreApplication.java    |    1 -
 .../PerformanceEntityRebuildIndexTest.java      |   23 +-
 .../index/impl/EsEntityIndexImpl.java           |  146 +-
 .../persistence/index/impl/EsProvider.java      |   21 +-
 .../apache/usergrid/rest/SystemResource.java    |   50 +-
 .../AbstractServiceNotificationIT.java          |    4 -
 13 files changed, 954 insertions(+), 1043 deletions(-)
----------------------------------------------------------------------



[08/13] git commit: Fixed index refresh issue.

Posted by to...@apache.org.
Fixed index refresh issue.

Fixed source/target switch on entity rebuild

Added performance comments as encountered


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/c31c553f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/c31c553f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/c31c553f

Branch: refs/heads/cloudformation-update
Commit: c31c553f9b3c16deda28fc0125d5b1a86a654d14
Parents: f807627
Author: Todd Nine <to...@apache.org>
Authored: Thu Oct 16 18:22:05 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Thu Oct 16 18:22:05 2014 -0600

----------------------------------------------------------------------
 .../corepersistence/CpEntityManager.java        |    6 +-
 .../corepersistence/CpEntityManagerFactory.java |   13 +-
 .../corepersistence/CpRelationManager.java      | 1427 ++++++++----------
 .../usergrid/corepersistence/CpWalker.java      |   11 +-
 .../PerformanceEntityRebuildIndexTest.java      |    3 +-
 .../index/impl/EsEntityIndexImpl.java           |   44 +-
 6 files changed, 692 insertions(+), 812 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c31c553f/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index 4015014..2498dda 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -514,7 +514,7 @@ public class CpEntityManager implements EntityManager {
 
         // update in all containing collections and connection indexes
         CpRelationManager rm = (CpRelationManager)getRelationManager( entity );
-        rm.updateContainingCollectionAndCollectionIndexes( entity, cpEntity );
+        rm.updateContainingCollectionAndCollectionIndexes( cpEntity );
     }
 
 
@@ -995,7 +995,7 @@ public class CpEntityManager implements EntityManager {
 //        }
 
         org.apache.usergrid.persistence.model.entity.Entity cpEntity =
-                ecm.load( entityId ).toBlockingObservable().last();
+                ecm.load( entityId ).toBlocking().last();
 
         cpEntity.removeField( propertyName );
 
@@ -1012,7 +1012,7 @@ public class CpEntityManager implements EntityManager {
 
         // update in all containing collections and connection indexes
         CpRelationManager rm = (CpRelationManager)getRelationManager( entityRef );
-        rm.updateContainingCollectionAndCollectionIndexes( get( entityRef ), cpEntity );
+        rm.updateContainingCollectionAndCollectionIndexes( cpEntity );
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c31c553f/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
index 2514e20..e013957 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
@@ -147,6 +147,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
                 sysAppProps.put( PROPERTY_NAME, "systemapp");
                 em.create( SYSTEM_APP_ID, TYPE_APPLICATION, sysAppProps );
                 em.getApplication();
+                em.createIndex();
                 em.refreshIndex();
             }
 
@@ -200,7 +201,10 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
     private EntityManager _getEntityManager( UUID applicationId ) {
         EntityManager em = new CpEntityManager();
         em.init( this, applicationId );
-        //TODO T.N. Can we remove this?  Seems like we should fix our lifecycle instead...
+        //TODO PERFORMANCE  Can we remove this?  Seems like we should fix our lifecycle instead...
+        //if this is the first time we've loaded this entity manager in the JVM, create it's indexes, it may be new
+        //not sure how to handle other than this if the system dies after the application em has been created
+        //but before the create call can create the index
         em.createIndex();
         return em;
     }
@@ -286,7 +290,8 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
         EntityManager appEm = getEntityManager( applicationId );
 
         //create our ES index since we're initializing this application
-//  TODO T.N, pushed this down into the cache load      appEm.createIndex();
+//  TODO PERFORMANCE  pushed this down into the cache load can we do this here?
+//        appEm.createIndex();
 
         appEm.create( applicationId, TYPE_APPLICATION, properties );
         appEm.resetRoles();
@@ -681,10 +686,12 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
     public void rebuildApplicationIndexes( UUID appId, ProgressObserver po ) throws Exception {
 
         EntityManager em = getEntityManager( appId );
+
+        //explicitly invoke create index, we don't know if it exists or not in ES during a rebuild.
+        em.createIndex();
         Application app = em.getApplication();
 
         em.reindex( po );
-//        em.refreshIndex();
 
         logger.info("\n\nRebuilt index for application {} id {}\n", app.getName(), appId );
     }


[05/13] git commit: Explicitly catch IndexMissingException

Posted by to...@apache.org.
Explicitly catch IndexMissingException


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/f807627b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/f807627b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/f807627b

Branch: refs/heads/cloudformation-update
Commit: f807627b954b1554b0004c9e18555a867186657e
Parents: e5efa67
Author: Todd Nine <to...@apache.org>
Authored: Thu Oct 16 15:38:04 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Thu Oct 16 15:38:04 2014 -0600

----------------------------------------------------------------------
 .../apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f807627b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index 6ed9b86..9ea14a1 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@ -38,6 +38,7 @@ import org.elasticsearch.common.xcontent.XContentFactory;
 import org.elasticsearch.index.query.FilterBuilder;
 import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.indices.IndexAlreadyExistsException;
+import org.elasticsearch.indices.IndexMissingException;
 import org.elasticsearch.search.SearchHit;
 import org.elasticsearch.search.SearchHits;
 import org.elasticsearch.search.sort.FieldSortBuilder;
@@ -278,7 +279,7 @@ public class EsEntityIndexImpl implements EntityIndex {
                     client.admin().indices().prepareRefresh( indexName ).execute().actionGet();
                     break;
 
-                }catch(Exception e){
+                }catch(IndexMissingException e){
                    log.error( "Unable to refresh index after create. Waiting before sleeping.", e );
                 }