You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2014/11/20 23:48:23 UTC

[1/3] incubator-usergrid git commit: Refactored generation of collection scope to be re-used with CpNamingUtils for consistency

Repository: incubator-usergrid
Updated Branches:
  refs/heads/USERGRID-250-buffer-size-fix 68f4e0f1a -> e32d5212a


Refactored generation of collection scope to be re-used with CpNamingUtils for consistency


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/9ed79642
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/9ed79642
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/9ed79642

Branch: refs/heads/USERGRID-250-buffer-size-fix
Commit: 9ed796423f15a8ef33137b113f6e04c578994b21
Parents: 68f4e0f
Author: Todd Nine <tn...@apigee.com>
Authored: Thu Nov 20 14:15:24 2014 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Thu Nov 20 14:23:38 2014 -0700

----------------------------------------------------------------------
 .../migration/EntityDataMigration.java          | 139 +++++++++++++++++
 .../migration/EntityTypeMappingMigration.java   |  28 ++--
 .../migration/GraphShardVersionMigration.java   | 104 +++++++------
 .../rx/AllEntitiesInSystemObservable.java       |  27 ++--
 .../corepersistence/StaleIndexCleanupTest.java  |   5 +-
 .../migration/EntityTypeMappingMigrationIT.java |  74 +++++-----
 .../migration/GraphShardVersionMigrationIT.java | 148 +++++++++++--------
 .../rx/AllEntitiesInSystemObservableIT.java     |  23 ++-
 8 files changed, 364 insertions(+), 184 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9ed79642/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityDataMigration.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityDataMigration.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityDataMigration.java
new file mode 100644
index 0000000..79d31a8
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityDataMigration.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.migration;
+
+
+import java.util.Iterator;
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.corepersistence.ManagerCache;
+import org.apache.usergrid.corepersistence.rx.AllEntitiesInSystemObservable;
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.MvccEntity;
+import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.core.guice.CurrentImpl;
+import org.apache.usergrid.persistence.core.guice.PreviousImpl;
+import org.apache.usergrid.persistence.core.migration.data.DataMigration;
+import org.apache.usergrid.persistence.core.migration.data.DataMigrationException;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+
+import com.google.inject.Inject;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+import rx.functions.Action1;
+
+
+/**
+ * Migration for migrating graph edges to the new Shards
+ */
+public class EntityDataMigration implements DataMigration {
+
+
+    private static final Logger logger = LoggerFactory.getLogger( EntityDataMigration.class );
+
+
+    private final MvccEntitySerializationStrategy v1Serialization;
+    private final MvccEntitySerializationStrategy v2Serialization;
+
+    private final ManagerCache managerCache;
+    private final Keyspace keyspace;
+
+
+    @Inject
+    public EntityDataMigration( @PreviousImpl final MvccEntitySerializationStrategy v1Serialization,
+                                @CurrentImpl final MvccEntitySerializationStrategy v2Serialization,
+                                final ManagerCache managerCache, final Keyspace keyspace ) {
+        this.v1Serialization = v1Serialization;
+        this.v2Serialization = v2Serialization;
+        this.managerCache = managerCache;
+        this.keyspace = keyspace;
+    }
+
+
+    @Override
+    public void migrate( final ProgressObserver observer ) throws Throwable {
+
+
+        AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache, 1000 ).doOnNext(
+                new Action1<AllEntitiesInSystemObservable.ApplicationEntityGroup>() {
+
+
+                    @Override
+                    public void call(
+                            final AllEntitiesInSystemObservable.ApplicationEntityGroup applicationEntityGroup ) {
+
+
+                        final UUID now = UUIDGenerator.newTimeUUID();
+
+                        final Id appScopeId = applicationEntityGroup.applicationScope.getApplication();
+
+
+                        final MutationBatch totalBatch = keyspace.prepareMutationBatch();
+
+                        for ( Id entityId : applicationEntityGroup.entityIds ) {
+
+                            CollectionScope currentScope = CpNamingUtils.getCollectionScopeNameFromEntityType(
+                                    appScopeId, entityId.getType() );
+
+
+                            Iterator<MvccEntity> allVersions =
+                                    v1Serialization.loadDescendingHistory( currentScope, entityId, now, 1000 );
+
+                            while ( allVersions.hasNext() ) {
+                                final MvccEntity version = allVersions.next();
+
+                                final MutationBatch versionBatch = v2Serialization.write( currentScope, version );
+
+                                totalBatch.mergeShallow( versionBatch );
+
+                                if ( totalBatch.getRowCount() >= 50 ) {
+                                    try {
+                                        totalBatch.execute();
+                                    }
+                                    catch ( ConnectionException e ) {
+                                        throw new DataMigrationException( "Unable to migrate batches ", e );
+                                    }
+                                }
+                            }
+                        }
+
+                        try {
+                            totalBatch.execute();
+                        }
+                        catch ( ConnectionException e ) {
+                            throw new DataMigrationException( "Unable to migrate batches ", e );
+                        }
+                    }
+                } ).toBlocking().last();
+    }
+
+
+    @Override
+    public int getVersion() {
+        return Versions.VERSION_3;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9ed79642/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java
index 1adfe73..8089dfd 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java
@@ -29,13 +29,10 @@ import org.slf4j.LoggerFactory;
 import org.apache.usergrid.corepersistence.ManagerCache;
 import org.apache.usergrid.corepersistence.rx.AllEntitiesInSystemObservable;
 import org.apache.usergrid.corepersistence.util.CpNamingUtils;
-import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
 import org.apache.usergrid.persistence.core.migration.data.DataMigration;
-import org.apache.usergrid.persistence.graph.GraphManagerFactory;
 import org.apache.usergrid.persistence.map.MapManager;
-import org.apache.usergrid.persistence.map.MapManagerFactory;
 import org.apache.usergrid.persistence.map.MapScope;
-import org.apache.usergrid.persistence.map.impl.MapScopeImpl;
+import org.apache.usergrid.persistence.model.entity.Id;
 
 import com.google.inject.Inject;
 
@@ -47,9 +44,6 @@ import rx.functions.Action1;
  */
 public class EntityTypeMappingMigration implements DataMigration {
 
-
-    private static final Logger logger = LoggerFactory.getLogger( EntityTypeMappingMigration.class );
-
     private final ManagerCache managerCache;
 
 
@@ -65,25 +59,27 @@ public class EntityTypeMappingMigration implements DataMigration {
 
         final AtomicLong atomicLong = new AtomicLong();
 
-        AllEntitiesInSystemObservable.getAllEntitiesInSystem(managerCache )
-                                     .doOnNext( new Action1<AllEntitiesInSystemObservable.EntityData>() {
+        AllEntitiesInSystemObservable.getAllEntitiesInSystem(managerCache, 1000 )
+                                     .doOnNext( new Action1<AllEntitiesInSystemObservable.ApplicationEntityGroup>() {
 
 
                                          @Override
-                                         public void call( final AllEntitiesInSystemObservable.EntityData entityData ) {
+                                         public void call( final AllEntitiesInSystemObservable.ApplicationEntityGroup applicationEntityGroup ) {
 
-                                             final MapScope ms = CpNamingUtils.getEntityTypeMapScope( entityData.applicationScope.getApplication() );
+                                             final MapScope ms = CpNamingUtils.getEntityTypeMapScope( applicationEntityGroup.applicationScope.getApplication() );
 
 
                                              final MapManager mapManager = managerCache.getMapManager( ms );
 
-                                             final UUID entityUuid = entityData.entityId.getUuid();
-                                             final String entityType = entityData.entityId.getType();
+                                             for(Id entityId: applicationEntityGroup.entityIds) {
+                                                 final UUID entityUuid = entityId.getUuid();
+                                                 final String entityType = entityId.getType();
 
-                                             mapManager.putString( entityUuid.toString(), entityType );
+                                                 mapManager.putString( entityUuid.toString(), entityType );
 
-                                             if ( atomicLong.incrementAndGet() % 100 == 0 ) {
-                                                 updateStatus( atomicLong, observer );
+                                                 if ( atomicLong.incrementAndGet() % 100 == 0 ) {
+                                                     updateStatus( atomicLong, observer );
+                                                 }
                                              }
                                          }
                                      } ).toBlocking().lastOrDefault( null );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9ed79642/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigration.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigration.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigration.java
index ac4cd58..3b92570 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigration.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigration.java
@@ -36,6 +36,7 @@ import org.apache.usergrid.persistence.core.migration.data.DataMigration;
 import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.graph.GraphManager;
 import org.apache.usergrid.persistence.graph.serialization.EdgeMetadataSerialization;
+import org.apache.usergrid.persistence.model.entity.Id;
 
 import com.google.inject.Inject;
 import com.netflix.astyanax.Keyspace;
@@ -64,8 +65,7 @@ public class GraphShardVersionMigration implements DataMigration {
 
     @Inject
     public GraphShardVersionMigration( @CurrentImpl final EdgeMetadataSerialization v2Serialization,
-                                       final ManagerCache managerCache, final
-    Keyspace keyspace ) {
+                                       final ManagerCache managerCache, final Keyspace keyspace ) {
         this.v2Serialization = v2Serialization;
         this.managerCache = managerCache;
         this.keyspace = keyspace;
@@ -77,51 +77,71 @@ public class GraphShardVersionMigration implements DataMigration {
 
         final AtomicLong counter = new AtomicLong();
 
-        AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache).flatMap(
-                new Func1<AllEntitiesInSystemObservable.EntityData, Observable<List<Edge>>>() {
+        AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache, 1000 ).flatMap(
+                new Func1<AllEntitiesInSystemObservable.ApplicationEntityGroup, Observable<List<Edge>>>() {
 
 
                     @Override
-                    public Observable<List<Edge>> call( final AllEntitiesInSystemObservable.EntityData entityData ) {
-                        logger.info( "Migrating edges from node {} in scope {}", entityData.entityId,
-                                entityData.applicationScope );
-
-                        final GraphManager gm = managerCache.getGraphManager( entityData.applicationScope );
-
-                        //get each edge from this node as a source
-                        return EdgesFromSourceObservable.edgesFromSource( gm, entityData.entityId )
-
-                                //for each edge, re-index it in v2  every 1000 edges or less
-                                .buffer( 1000 ).doOnNext( new Action1<List<Edge>>() {
-                                    @Override
-                                    public void call( final List<Edge> edges ) {
-
-                                        final MutationBatch batch = keyspace.prepareMutationBatch();
-
-                                        for ( final Edge edge : edges ) {
-                                            logger.info( "Migrating meta for edge {}", edge );
-                                            final MutationBatch edgeBatch =
-                                                    v2Serialization.writeEdge( entityData.applicationScope, edge );
-                                            batch.mergeShallow( edgeBatch );
-                                        }
-
-                                        try {
-                                            batch.execute();
-                                        }
-                                        catch ( ConnectionException e ) {
-                                            throw new RuntimeException( "Unable to perform migration", e );
-                                        }
-
-                                        //update the observer so the admin can see it
-                                        final long newCount = counter.addAndGet( edges.size() );
-
-                                        observer.update( getVersion(), String.format("Currently running.  Rewritten %d edge types", newCount) );
-
-
-                                    }
-                                } );
+                    public Observable<List<Edge>> call(
+                            final AllEntitiesInSystemObservable.ApplicationEntityGroup applicationEntityGroup ) {
+
+                        //emit a stream of all ids from this group
+                        return Observable.from( applicationEntityGroup.entityIds )
+                                         .flatMap( new Func1<Id, Observable<List<Edge>>>() {
+
+
+                                             //for each id in the group, get it's edges
+                                             @Override
+                                             public Observable<List<Edge>> call( final Id id ) {
+                                                 logger.info( "Migrating edges from node {} in scope {}", id,
+                                                         applicationEntityGroup.applicationScope );
+
+                                                 final GraphManager gm = managerCache
+                                                         .getGraphManager( applicationEntityGroup.applicationScope );
+
+                                                 //get each edge from this node as a source
+                                                 return EdgesFromSourceObservable.edgesFromSource( gm, id )
+
+                                                         //for each edge, re-index it in v2  every 1000 edges or less
+                                                         .buffer( 1000 ).doOnNext( new Action1<List<Edge>>() {
+                                                             @Override
+                                                             public void call( final List<Edge> edges ) {
+
+                                                                 final MutationBatch batch =
+                                                                         keyspace.prepareMutationBatch();
+
+                                                                 for ( final Edge edge : edges ) {
+                                                                     logger.info( "Migrating meta for edge {}", edge );
+                                                                     final MutationBatch edgeBatch = v2Serialization
+                                                                             .writeEdge(
+                                                                                     applicationEntityGroup
+                                                                                             .applicationScope,
+                                                                                     edge );
+                                                                     batch.mergeShallow( edgeBatch );
+                                                                 }
+
+                                                                 try {
+                                                                     batch.execute();
+                                                                 }
+                                                                 catch ( ConnectionException e ) {
+                                                                     throw new RuntimeException(
+                                                                             "Unable to perform migration", e );
+                                                                 }
+
+                                                                 //update the observer so the admin can see it
+                                                                 final long newCount =
+                                                                         counter.addAndGet( edges.size() );
+
+                                                                 observer.update( getVersion(), String.format(
+                                                                         "Currently running.  Rewritten %d edge types",
+                                                                         newCount ) );
+                                                             }
+                                                         } );
+                                             }
+                                         } );
                     }
                 } ).toBlocking().lastOrDefault( null );
+        ;
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9ed79642/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservable.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservable.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservable.java
index 291bbe9..771b81f 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservable.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservable.java
@@ -20,6 +20,8 @@
 package org.apache.usergrid.corepersistence.rx;
 
 
+import java.util.List;
+
 import org.apache.usergrid.corepersistence.ManagerCache;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
@@ -40,14 +42,17 @@ public class AllEntitiesInSystemObservable {
 
     /**
      * Return an observable that emits all entities in the system.
+     * @param managerCache the managerCache to use
+     * @param bufferSize The amount of entityIds to buffer into each ApplicationEntityGroup.  Note that if we exceed the buffer size
+     * you may be more than 1 ApplicationEntityGroup with the same application and different ids
      */
-    public static Observable<EntityData> getAllEntitiesInSystem( final ManagerCache managerCache) {
+    public static Observable<ApplicationEntityGroup> getAllEntitiesInSystem( final ManagerCache managerCache, final int bufferSize) {
         //traverse all nodes in the graph, load all source edges from them, then re-save the meta data
         return ApplicationObservable.getAllApplicationIds( managerCache )
 
-                                    .flatMap( new Func1<Id, Observable<EntityData>>() {
+                                    .flatMap( new Func1<Id, Observable<ApplicationEntityGroup>>() {
                                         @Override
-                                        public Observable<EntityData> call( final Id applicationId ) {
+                                        public Observable<ApplicationEntityGroup> call( final Id applicationId ) {
 
                                             //set up our application scope and graph manager
                                             final ApplicationScope applicationScope = new ApplicationScopeImpl(
@@ -68,11 +73,11 @@ public class AllEntitiesInSystemObservable {
 
                                             //merge both the specified application node and the entity node
                                             // so they all get used
-                                            return Observable.merge( applicationNode, entityNodes )
-                                                             .map( new Func1<Id, EntityData>() {
+                                            return Observable.merge( applicationNode, entityNodes ).buffer(bufferSize)
+                                                             .map( new Func1<List<Id>, ApplicationEntityGroup>() {
                                                                  @Override
-                                                                 public EntityData call( final Id id ) {
-                                                                     return new EntityData( applicationScope, id );
+                                                                 public ApplicationEntityGroup call( final List<Id> id ) {
+                                                                     return new ApplicationEntityGroup( applicationScope, id );
                                                                  }
                                                              } );
                                         }
@@ -83,14 +88,14 @@ public class AllEntitiesInSystemObservable {
     /**
      * Get the entity data.  Immutable bean for fast access
      */
-    public static final class EntityData {
+    public static final class ApplicationEntityGroup {
         public final ApplicationScope applicationScope;
-        public final Id entityId;
+        public final List<Id> entityIds;
 
 
-        public EntityData( final ApplicationScope applicationScope, final Id entityId ) {
+        public ApplicationEntityGroup( final ApplicationScope applicationScope, final List<Id> entityIds ) {
             this.applicationScope = applicationScope;
-            this.entityId = entityId;
+            this.entityIds = entityIds;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9ed79642/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
index fa9f9df..9d0c9e6 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
@@ -52,6 +52,7 @@ import org.apache.usergrid.persistence.model.entity.SimpleId;
 
 import com.fasterxml.uuid.UUIDComparator;
 
+import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getCollectionScopeNameFromEntityType;
 import static org.apache.usergrid.persistence.Schema.TYPE_APPLICATION;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -220,9 +221,7 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
 
         EntityManager em = app.getEntityManager();
 
-        CollectionScope cs = new CollectionScopeImpl( new SimpleId( em.getApplicationId(), TYPE_APPLICATION ),
-                new SimpleId( em.getApplicationId(), TYPE_APPLICATION ),
-                CpNamingUtils.getCollectionScopeNameFromEntityType( eref.getType() ) );
+        CollectionScope cs = getCollectionScopeNameFromEntityType(  new SimpleId( em.getApplicationId(), TYPE_APPLICATION ), eref.getType() );
 
         EntityCollectionManagerFactory ecmf = CpSetup.getInjector().getInstance( EntityCollectionManagerFactory.class );
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9ed79642/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java
index dafdb00..1f0665a 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java
@@ -35,7 +35,6 @@ import org.apache.usergrid.corepersistence.util.CpNamingUtils;
 import org.apache.usergrid.persistence.Entity;
 import org.apache.usergrid.persistence.EntityManager;
 import org.apache.usergrid.persistence.EntityManagerFactory;
-import org.apache.usergrid.persistence.core.migration.schema.MigrationManager;
 import org.apache.usergrid.persistence.map.impl.MapSerializationImpl;
 import org.apache.usergrid.persistence.model.entity.Id;
 
@@ -46,7 +45,6 @@ import rx.functions.Action1;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 
@@ -75,7 +73,7 @@ public class EntityTypeMappingMigrationIT extends AbstractCoreIT {
     @Test
     public void testIdMapping() throws Throwable {
 
-        assertEquals("version 1 expected", 1, entityTypeMappingMigration.getVersion());
+        assertEquals( "version 1 expected", 1, entityTypeMappingMigration.getVersion() );
 
         final EntityManager newAppEm = app.getEntityManager();
 
@@ -87,7 +85,6 @@ public class EntityTypeMappingMigrationIT extends AbstractCoreIT {
         final Set<Id> type2Identities = EntityWriteHelper.createTypes( newAppEm, type2, size );
 
 
-
         final Set<Id> allEntities = new HashSet<>();
         allEntities.addAll( type1Identities );
         allEntities.addAll( type2Identities );
@@ -106,38 +103,45 @@ public class EntityTypeMappingMigrationIT extends AbstractCoreIT {
         entityTypeMappingMigration.migrate( progressObserver );
 
 
-
-
-
-        AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache )
-                                     .doOnNext( new Action1<AllEntitiesInSystemObservable.EntityData>() {
+        AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache, 1000 )
+                                     .doOnNext( new Action1<AllEntitiesInSystemObservable.ApplicationEntityGroup>() {
                                          @Override
-                                         public void call( final AllEntitiesInSystemObservable.EntityData entity ) {
+                                         public void call(
+                                                 final AllEntitiesInSystemObservable.ApplicationEntityGroup entity ) {
                                              //ensure that each one has a type
-                                             try {
-
-                                                 final EntityManager em = emf.getEntityManager( entity.applicationScope.getApplication().getUuid() );
-                                                 final Entity returned = em.get( entity.entityId.getUuid() );
-
-                                                 //we seem to occasionally get phantom edges.  If this is the case we'll store the type _> uuid mapping, but we won't have anything to load
-                                                if(returned != null) {
-                                                    assertEquals( entity.entityId.getUuid(), returned.getUuid() );
-                                                    assertEquals( entity.entityId.getType(), returned.getType() );
-                                                }
-                                                else {
-                                                    final String type = managerCache.getMapManager( CpNamingUtils.getEntityTypeMapScope(
-                                                            entity.applicationScope.getApplication() ) )
-                                                            .getString( entity.entityId.getUuid().toString() );
-
-                                                    assertEquals(entity.entityId.getType(), type);
-                                                }
-                                             }
-                                             catch ( Exception e ) {
-                                                 throw new RuntimeException( "Unable to get entity " + entity.entityId
-                                                         + " by UUID, migration failed", e );
-                                             }
 
-                                             allEntities.remove( entity.entityId );
+                                             final EntityManager em = emf.getEntityManager(
+                                                     entity.applicationScope.getApplication().getUuid() );
+
+                                             for ( final Id id : entity.entityIds ) {
+                                                 try {
+                                                     final Entity returned = em.get( id.getUuid() );
+
+                                                     //we seem to occasionally get phantom edges.  If this is the
+                                                     // case we'll store the type _> uuid mapping, but we won't have
+                                                     // anything to load
+
+                                                     if ( returned != null ) {
+                                                         assertEquals( id.getUuid(), returned.getUuid() );
+                                                         assertEquals( id.getType(), returned.getType() );
+                                                     }
+                                                     else {
+                                                         final String type = managerCache.getMapManager( CpNamingUtils
+                                                                 .getEntityTypeMapScope(
+                                                                         entity.applicationScope.getApplication() ) )
+                                                                                         .getString( id.getUuid()
+                                                                                                       .toString() );
+
+                                                         assertEquals( id.getType(), type );
+                                                     }
+                                                 }
+                                                 catch ( Exception e ) {
+                                                     throw new RuntimeException( "Unable to get entity " + id
+                                                             + " by UUID, migration failed", e );
+                                                 }
+
+                                                 allEntities.remove( id );
+                                             }
                                          }
                                      } ).toBlocking().lastOrDefault( null );
 
@@ -145,9 +149,5 @@ public class EntityTypeMappingMigrationIT extends AbstractCoreIT {
         assertEquals( "Every element should have been encountered", 0, allEntities.size() );
         assertFalse( "Progress observer should not have failed", progressObserver.getFailed() );
         assertTrue( "Progress observer should have update messages", progressObserver.getUpdates().size() > 0 );
-
-
     }
-
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9ed79642/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigrationIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigrationIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigrationIT.java
index aab47a0..88c02cd 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigrationIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigrationIT.java
@@ -32,12 +32,9 @@ import org.apache.usergrid.corepersistence.EntityWriteHelper;
 import org.apache.usergrid.corepersistence.ManagerCache;
 import org.apache.usergrid.corepersistence.rx.AllEntitiesInSystemObservable;
 import org.apache.usergrid.persistence.EntityManager;
-import org.apache.usergrid.persistence.EntityManagerFactory;
 import org.apache.usergrid.persistence.core.migration.data.DataMigrationManager;
 import org.apache.usergrid.persistence.core.migration.data.DataMigrationManagerImpl;
 import org.apache.usergrid.persistence.core.migration.data.MigrationInfoSerialization;
-import org.apache.usergrid.persistence.core.migration.data.MigrationInfoSerializationImpl;
-import org.apache.usergrid.persistence.core.migration.schema.MigrationManager;
 import org.apache.usergrid.persistence.graph.GraphManager;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType;
 import org.apache.usergrid.persistence.model.entity.Id;
@@ -114,40 +111,41 @@ public class GraphShardVersionMigrationIT extends AbstractCoreIT {
 
         //read everything in previous version format and put it into our types.
 
-        AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache )
-                                     .doOnNext( new Action1<AllEntitiesInSystemObservable.EntityData>() {
+        AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache, 1000)
+                                     .doOnNext( new Action1<AllEntitiesInSystemObservable.ApplicationEntityGroup>() {
                                          @Override
-                                         public void call( final AllEntitiesInSystemObservable.EntityData entity ) {
+                                         public void call( final AllEntitiesInSystemObservable.ApplicationEntityGroup entity ) {
 
                                              final GraphManager gm =
                                                      managerCache.getGraphManager( entity.applicationScope );
 
-                                             /**
-                                              * Get our edge types from the source
-                                              */
-                                             gm.getEdgeTypesFromSource(
-                                                     new SimpleSearchEdgeType( entity.entityId, null, null ) )
-                                               .doOnNext( new Action1<String>() {
-                                                   @Override
-                                                   public void call( final String s ) {
-                                                       sourceTypes.put( entity.entityId, s );
-                                                   }
-                                               } ).toBlocking().lastOrDefault( null );
-
-
-                                             /**
-                                              * Get the edge types to the target
-                                              */
-                                             gm.getEdgeTypesToTarget(
-                                                     new SimpleSearchEdgeType( entity.entityId, null, null ) )
-                                               .doOnNext( new Action1<String>() {
-                                                   @Override
-                                                   public void call( final String s ) {
-                                                       targetTypes.put( entity.entityId, s );
-                                                   }
-                                               } ).toBlocking().lastOrDefault( null );
-
-                                             allEntities.remove( entity.entityId );
+                                             for(final Id id: entity.entityIds) {
+                                                 /**
+                                                  * Get our edge types from the source
+                                                  */
+                                                 gm.getEdgeTypesFromSource( new SimpleSearchEdgeType( id, null, null ) )
+                                                   .doOnNext( new Action1<String>() {
+                                                       @Override
+                                                       public void call( final String s ) {
+                                                           sourceTypes.put( id, s );
+                                                       }
+                                                   } ).toBlocking().lastOrDefault( null );
+
+
+                                                 /**
+                                                  * Get the edge types to the target
+                                                  */
+                                                 gm.getEdgeTypesToTarget( new SimpleSearchEdgeType( id,
+                                                         null, null ) )
+                                                   .doOnNext( new Action1<String>() {
+                                                       @Override
+                                                       public void call( final String s ) {
+                                                           targetTypes.put( id, s );
+                                                       }
+                                                   } ).toBlocking().lastOrDefault( null );
+
+                                                 allEntities.remove( id );
+                                             }
                                          }
                                      } ).toBlocking().lastOrDefault( null );
 
@@ -169,42 +167,66 @@ public class GraphShardVersionMigrationIT extends AbstractCoreIT {
 
 
         //now visit all nodes in the system and remove their types from the multi maps, it should be empty at the end
-        AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache )
-                                     .doOnNext( new Action1<AllEntitiesInSystemObservable.EntityData>() {
+        AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache, 1000 )
+                                     .doOnNext( new Action1<AllEntitiesInSystemObservable.ApplicationEntityGroup>() {
                                          @Override
-                                         public void call( final AllEntitiesInSystemObservable.EntityData entity ) {
+                                         public void call(
+                                                 final AllEntitiesInSystemObservable.ApplicationEntityGroup entity ) {
 
                                              final GraphManager gm =
                                                      managerCache.getGraphManager( entity.applicationScope );
 
-                                             /**
-                                              * Get our edge types from the source
-                                              */
-                                             gm.getEdgeTypesFromSource(
-                                                     new SimpleSearchEdgeType( entity.entityId, null, null ) )
-                                               .doOnNext( new Action1<String>() {
-                                                   @Override
-                                                   public void call( final String s ) {
-                                                       sourceTypes.remove( entity.entityId, s );
-                                                   }
-                                               } ).toBlocking().lastOrDefault( null );
-
-
-                                             /**
-                                              * Get the edge types to the target
-                                              */
-                                             gm.getEdgeTypesToTarget(
-                                                     new SimpleSearchEdgeType( entity.entityId, null, null ) )
-                                               .doOnNext( new Action1<String>() {
-                                                   @Override
-                                                   public void call( final String s ) {
-                                                       targetTypes.remove( entity.entityId, s );
-                                                   }
-                                               } ).toBlocking().lastOrDefault( null );
+                                             for ( final Id id : entity.entityIds ) {
+                                                 /**
+                                                  * Get our edge types from the source
+                                                  */
+                                                 gm.getEdgeTypesFromSource(
+                                                         new SimpleSearchEdgeType( id, null, null ) )
+                                                   .doOnNext( new Action1<String>() {
+                                                       @Override
+                                                       public void call( final String s ) {
+                                                           sourceTypes.remove( id, s );
+                                                       }
+                                                   } ).toBlocking().lastOrDefault( null );
+
+
+                                                 /**
+                                                  * Get the edge types to the target
+                                                  */
+                                                 gm.getEdgeTypesToTarget(
+                                                         new SimpleSearchEdgeType( id, null, null ) )
+                                                   .doOnNext( new Action1<String>() {
+                                                       @Override
+                                                       public void call( final String s ) {
+                                                           targetTypes.remove( id, s );
+                                                       }
+                                                   } ).toBlocking().lastOrDefault( null );
+                                             }
+                                             }
                                          }
-                                     } ).toBlocking().lastOrDefault( null );
 
-        assertEquals( "All source types migrated", 0, sourceTypes.size() );
-        assertEquals( "All target types migrated", 0, targetTypes.size() );
+
+                                         ).
+
+
+                                         toBlocking()
+
+
+                                         .
+
+
+                                         lastOrDefault( null );
+
+
+                                         assertEquals( "All source types migrated",0,sourceTypes.size( )
+
+
+                                         );
+
+
+                                         assertEquals( "All target types migrated",0,targetTypes.size( )
+
+
+                                         );
+                                     }
     }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9ed79642/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservableIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservableIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservableIT.java
index 423dc1f..4d1c6c9 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservableIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservableIT.java
@@ -20,7 +20,6 @@
 package org.apache.usergrid.corepersistence.rx;
 
 
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Set;
 
@@ -33,13 +32,11 @@ import org.apache.usergrid.corepersistence.CpSetup;
 import org.apache.usergrid.corepersistence.EntityWriteHelper;
 import org.apache.usergrid.corepersistence.ManagerCache;
 import org.apache.usergrid.corepersistence.util.CpNamingUtils;
-import org.apache.usergrid.persistence.Entity;
 import org.apache.usergrid.persistence.EntityManager;
 import org.apache.usergrid.persistence.SimpleEntityRef;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.GraphManager;
 import org.apache.usergrid.persistence.model.entity.Id;
-import org.apache.usergrid.persistence.model.entity.SimpleId;
 
 import rx.functions.Action1;
 
@@ -95,26 +92,28 @@ public class AllEntitiesInSystemObservableIT extends AbstractCoreIT {
 
         final GraphManager gm = managerCache.getGraphManager( scope );
 
-        AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache ).doOnNext( new Action1<AllEntitiesInSystemObservable.EntityData>() {
+        AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache, 1000 ).doOnNext( new Action1<AllEntitiesInSystemObservable.ApplicationEntityGroup>() {
             @Override
-            public void call( final AllEntitiesInSystemObservable.EntityData entity ) {
+            public void call( final AllEntitiesInSystemObservable.ApplicationEntityGroup entity ) {
 
                 assertNotNull(entity);
                 assertNotNull(entity.applicationScope);
-                assertNotNull(entity.entityId);
+                assertNotNull(entity.entityIds);
 
                 //not from our test, don't check it
                 if(!applicationId.equals( entity.applicationScope.getApplication() )){
                     return;
                 }
 
+                for(Id id: entity.entityIds) {
 
-                //we should only emit each node once
-                if ( entity.entityId.getType().equals( type1 ) ) {
-                    assertTrue( "Element should be present on removal", type1Identities.remove( entity.entityId ) );
-                }
-                else if ( entity.entityId.getType().equals( type2 ) ) {
-                    assertTrue( "Element should be present on removal", type2Identities.remove( entity.entityId ) );
+                    //we should only emit each node once
+                    if ( id.getType().equals( type1 ) ) {
+                        assertTrue( "Element should be present on removal", type1Identities.remove( id ) );
+                    }
+                    else if ( id.getType().equals( type2 ) ) {
+                        assertTrue( "Element should be present on removal", type2Identities.remove( id ) );
+                    }
                 }
             }
         } ).toBlocking().lastOrDefault( null );


[3/3] incubator-usergrid git commit: Finished migration testing. All core tests pass.

Posted by to...@apache.org.
Finished migration testing.  All core tests pass.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/e32d5212
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/e32d5212
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/e32d5212

Branch: refs/heads/USERGRID-250-buffer-size-fix
Commit: e32d5212af3c53cc84c9a505cb9bb6824e6a16f2
Parents: 364a6ea
Author: Todd Nine <tn...@apigee.com>
Authored: Thu Nov 20 15:48:19 2014 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Thu Nov 20 15:48:19 2014 -0700

----------------------------------------------------------------------
 .../migration/EntityDataMigration.java          |  90 ++++---
 .../rx/ApplicationObservable.java               |  65 +++--
 .../migration/EntityDataMigrationIT.java        | 251 +++++++++++++++++++
 .../migration/GraphShardVersionMigrationIT.java | 131 +++++-----
 4 files changed, 394 insertions(+), 143 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e32d5212/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityDataMigration.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityDataMigration.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityDataMigration.java
index 79d31a8..767574d 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityDataMigration.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityDataMigration.java
@@ -22,9 +22,7 @@ package org.apache.usergrid.corepersistence.migration;
 
 import java.util.Iterator;
 import java.util.UUID;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.usergrid.corepersistence.ManagerCache;
 import org.apache.usergrid.corepersistence.rx.AllEntitiesInSystemObservable;
@@ -53,9 +51,6 @@ import rx.functions.Action1;
 public class EntityDataMigration implements DataMigration {
 
 
-    private static final Logger logger = LoggerFactory.getLogger( EntityDataMigration.class );
-
-
     private final MvccEntitySerializationStrategy v1Serialization;
     private final MvccEntitySerializationStrategy v2Serialization;
 
@@ -77,58 +72,71 @@ public class EntityDataMigration implements DataMigration {
     @Override
     public void migrate( final ProgressObserver observer ) throws Throwable {
 
+        final AtomicLong atomicLong = new AtomicLong();
+
+        AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache, 1000 )
+                                     .doOnNext( new Action1<AllEntitiesInSystemObservable.ApplicationEntityGroup>() {
 
-        AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache, 1000 ).doOnNext(
-                new Action1<AllEntitiesInSystemObservable.ApplicationEntityGroup>() {
 
+                                                 @Override
+                                                 public void call(
+                                                         final AllEntitiesInSystemObservable.ApplicationEntityGroup
+                                                                 applicationEntityGroup ) {
 
-                    @Override
-                    public void call(
-                            final AllEntitiesInSystemObservable.ApplicationEntityGroup applicationEntityGroup ) {
 
+                                                     final UUID now = UUIDGenerator.newTimeUUID();
 
-                        final UUID now = UUIDGenerator.newTimeUUID();
+                                                     final Id appScopeId =
+                                                             applicationEntityGroup.applicationScope.getApplication();
 
-                        final Id appScopeId = applicationEntityGroup.applicationScope.getApplication();
 
+                                                     final MutationBatch totalBatch = keyspace.prepareMutationBatch();
 
-                        final MutationBatch totalBatch = keyspace.prepareMutationBatch();
+                                                     //go through each entity in the system, and load it's entire
+                                                     // history
+                                                     for ( Id entityId : applicationEntityGroup.entityIds ) {
 
-                        for ( Id entityId : applicationEntityGroup.entityIds ) {
+                                                         CollectionScope currentScope = CpNamingUtils
+                                                                 .getCollectionScopeNameFromEntityType( appScopeId,
+                                                                         entityId.getType() );
 
-                            CollectionScope currentScope = CpNamingUtils.getCollectionScopeNameFromEntityType(
-                                    appScopeId, entityId.getType() );
 
+                                                         //for each element in the history in the previous version,
+                                                         // copy it to the CF in v2
+                                                         Iterator<MvccEntity> allVersions = v1Serialization
+                                                                 .loadDescendingHistory( currentScope, entityId, now,
+                                                                         1000 );
 
-                            Iterator<MvccEntity> allVersions =
-                                    v1Serialization.loadDescendingHistory( currentScope, entityId, now, 1000 );
+                                                         while ( allVersions.hasNext() ) {
+                                                             final MvccEntity version = allVersions.next();
 
-                            while ( allVersions.hasNext() ) {
-                                final MvccEntity version = allVersions.next();
+                                                             final MutationBatch versionBatch =
+                                                                     v2Serialization.write( currentScope, version );
 
-                                final MutationBatch versionBatch = v2Serialization.write( currentScope, version );
+                                                             totalBatch.mergeShallow( versionBatch );
+
+                                                             if ( atomicLong.incrementAndGet() % 50 == 0 ) {
+                                                                 executeBatch( totalBatch, observer, atomicLong );
+                                                             }
+                                                         }
+                                                     }
+
+                                                     executeBatch( totalBatch, observer, atomicLong );
+                                                 }
+                                             } ).toBlocking().last();
+    }
 
-                                totalBatch.mergeShallow( versionBatch );
 
-                                if ( totalBatch.getRowCount() >= 50 ) {
-                                    try {
-                                        totalBatch.execute();
-                                    }
-                                    catch ( ConnectionException e ) {
-                                        throw new DataMigrationException( "Unable to migrate batches ", e );
-                                    }
-                                }
-                            }
-                        }
+    private void executeBatch( final MutationBatch batch, final ProgressObserver po, final AtomicLong count ) {
+        try {
+            batch.execute();
 
-                        try {
-                            totalBatch.execute();
-                        }
-                        catch ( ConnectionException e ) {
-                            throw new DataMigrationException( "Unable to migrate batches ", e );
-                        }
-                    }
-                } ).toBlocking().last();
+            po.update( getVersion(), "Finished copying " + count + " entities to the new format" );
+        }
+        catch ( ConnectionException e ) {
+            po.failed( getVersion(), "Failed to execute mutation in cassandra" );
+            throw new DataMigrationException( "Unable to migrate batches ", e );
+        }
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e32d5212/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/ApplicationObservable.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/ApplicationObservable.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/ApplicationObservable.java
index 898812b..6019bca 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/ApplicationObservable.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/ApplicationObservable.java
@@ -23,18 +23,20 @@ package org.apache.usergrid.corepersistence.rx;
 import java.util.Arrays;
 import java.util.UUID;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.usergrid.corepersistence.ManagerCache;
 import org.apache.usergrid.corepersistence.util.CpNamingUtils;
 import org.apache.usergrid.persistence.collection.CollectionScope;
 import org.apache.usergrid.persistence.collection.EntityCollectionManager;
-import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
 import org.apache.usergrid.persistence.collection.impl.CollectionScopeImpl;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.graph.GraphManager;
-import org.apache.usergrid.persistence.graph.GraphManagerFactory;
 import org.apache.usergrid.persistence.graph.SearchByEdgeType;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
+import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.Id;
 
 import rx.Observable;
@@ -47,14 +49,12 @@ import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getApplicat
 /**
  * An observable that will emit all application stored in the system.
  */
-public class ApplicationObservable  {
-
+public class ApplicationObservable {
 
+    private static final Logger logger = LoggerFactory.getLogger( ApplicationObservable.class );
 
     /**
      * Get all applicationIds as an observable
-     * @param managerCache
-     * @return
      */
     public static Observable<Id> getAllApplicationIds( final ManagerCache managerCache ) {
 
@@ -62,22 +62,20 @@ public class ApplicationObservable  {
         //this way consumers can perform whatever work they need to on the root system first
 
 
-       final Observable<Id> systemIds =  Observable.from( Arrays.asList( generateApplicationId( CpNamingUtils.DEFAULT_APPLICATION_ID ),
-                generateApplicationId( CpNamingUtils.MANAGEMENT_APPLICATION_ID ),
-                generateApplicationId( CpNamingUtils.SYSTEM_APP_ID ) ) );
-
-
+        final Observable<Id> systemIds = Observable.from( Arrays
+                .asList( generateApplicationId( CpNamingUtils.DEFAULT_APPLICATION_ID ),
+                        generateApplicationId( CpNamingUtils.MANAGEMENT_APPLICATION_ID ),
+                        generateApplicationId( CpNamingUtils.SYSTEM_APP_ID ) ) );
 
 
         final ApplicationScope appScope = getApplicationScope( CpNamingUtils.SYSTEM_APP_ID );
 
-        final CollectionScope appInfoCollectionScope = new CollectionScopeImpl(
-                appScope.getApplication(),
-                appScope.getApplication(),
-                CpNamingUtils.getCollectionScopeNameFromCollectionName( CpNamingUtils.APPINFOS ));
+        final CollectionScope appInfoCollectionScope =
+                new CollectionScopeImpl( appScope.getApplication(), appScope.getApplication(),
+                        CpNamingUtils.getCollectionScopeNameFromCollectionName( CpNamingUtils.APPINFOS ) );
 
-        final EntityCollectionManager
-                collectionManager = managerCache.getEntityCollectionManager( appInfoCollectionScope );
+        final EntityCollectionManager collectionManager =
+                managerCache.getEntityCollectionManager( appInfoCollectionScope );
 
 
         final GraphManager gm = managerCache.getGraphManager( appScope );
@@ -97,21 +95,34 @@ public class ApplicationObservable  {
                 //get the app info and load it
                 final Id appInfo = edge.getTargetNode();
 
-                return collectionManager.load( appInfo ).map( new Func1<org.apache.usergrid.persistence.model.entity.Entity, Id>() {
+                return collectionManager.load( appInfo )
+                        //filter out null entities
+                        .filter( new Func1<Entity, Boolean>() {
+                            @Override
+                            public Boolean call( final Entity entity ) {
+                                if ( entity == null ) {
+                                    logger.warn( "Encountered a null application info for id {}", appInfo );
+                                    return false;
+                                }
 
+                                return true;
+                            }
+                        } )
+                                //get the id from the entity
+                        .map( new Func1<org.apache.usergrid.persistence.model.entity.Entity, Id>() {
 
-                    @Override
-                    public Id call( final org.apache.usergrid.persistence.model.entity.Entity entity ) {
-                        final UUID  uuid = (UUID )entity.getField( "applicationUuid" ).getValue();
 
-                        return CpNamingUtils.generateApplicationId(uuid);
-                    }
-                } );
+                            @Override
+                            public Id call( final org.apache.usergrid.persistence.model.entity.Entity entity ) {
+
+                                final UUID uuid = ( UUID ) entity.getField( "applicationUuid" ).getValue();
+
+                                return CpNamingUtils.generateApplicationId( uuid );
+                            }
+                        } );
             }
         } );
 
-        return Observable.merge( systemIds, appIds);
+        return Observable.merge( systemIds, appIds );
     }
-
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e32d5212/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityDataMigrationIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityDataMigrationIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityDataMigrationIT.java
new file mode 100644
index 0000000..5c9e14c
--- /dev/null
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityDataMigrationIT.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.migration;
+
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.usergrid.AbstractCoreIT;
+import org.apache.usergrid.corepersistence.CpSetup;
+import org.apache.usergrid.corepersistence.EntityWriteHelper;
+import org.apache.usergrid.corepersistence.ManagerCache;
+import org.apache.usergrid.corepersistence.rx.AllEntitiesInSystemObservable;
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
+import org.apache.usergrid.persistence.Entity;
+import org.apache.usergrid.persistence.EntityManager;
+import org.apache.usergrid.persistence.EntityManagerFactory;
+import org.apache.usergrid.persistence.SimpleEntityRef;
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.MvccEntity;
+import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.core.guice.CurrentImpl;
+import org.apache.usergrid.persistence.core.guice.PreviousImpl;
+import org.apache.usergrid.persistence.core.migration.data.DataMigrationManager;
+import org.apache.usergrid.persistence.core.migration.data.DataMigrationManagerImpl;
+import org.apache.usergrid.persistence.core.migration.data.MigrationInfoSerialization;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+
+import com.google.inject.Injector;
+import com.google.inject.Key;
+
+import rx.functions.Action1;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+
+public class EntityDataMigrationIT extends AbstractCoreIT {
+
+
+    private Injector injector;
+
+
+    private EntityDataMigration entityDataMigration;
+    private ManagerCache managerCache;
+    private DataMigrationManager dataMigrationManager;
+    private MigrationInfoSerialization migrationInfoSerialization;
+    private MvccEntitySerializationStrategy v1Strategy;
+    private MvccEntitySerializationStrategy v2Strategy;
+    private EntityManagerFactory emf;
+
+
+    @Before
+    public void setup() {
+        emf = setup.getEmf();
+        injector = CpSetup.getInjector();
+        entityDataMigration = injector.getInstance( EntityDataMigration.class );
+        managerCache = injector.getInstance( ManagerCache.class );
+        dataMigrationManager = injector.getInstance( DataMigrationManager.class );
+        migrationInfoSerialization = injector.getInstance( MigrationInfoSerialization.class );
+        v1Strategy = injector.getInstance( Key.get(MvccEntitySerializationStrategy.class, PreviousImpl.class) );
+        v2Strategy = injector.getInstance( Key.get(MvccEntitySerializationStrategy.class, CurrentImpl.class) );
+    }
+
+
+    @Test
+    public void testDataMigration() throws Throwable {
+
+        assertEquals( "version 3 expected", 3, entityDataMigration.getVersion() );
+
+
+        /**
+         * Reset to our version -1 and start the migration
+         */
+        dataMigrationManager.resetToVersion( entityDataMigration.getVersion() - 1 );
+
+
+        final EntityManager newAppEm = app.getEntityManager();
+
+        final String type1 = "type1thing";
+        final String type2 = "type2thing";
+        final int size = 10;
+
+        final Set<Id> type1Identities = EntityWriteHelper.createTypes( newAppEm, type1, size );
+        final Set<Id> type2Identities = EntityWriteHelper.createTypes( newAppEm, type2, size );
+
+
+        final Set<Id> createdEntityIds = new HashSet<>();
+        createdEntityIds.addAll( type1Identities );
+        createdEntityIds.addAll( type2Identities );
+
+
+        final TestProgressObserver progressObserver = new TestProgressObserver();
+
+
+        //load everything that appears in v1, migrate and ensure it appears in v2
+        final Set<MvccEntity> savedEntities = new HashSet<>( 10000 );
+        //set that holds all entityIds for later assertion
+        final Set<Id> entityIds = new HashSet<>(10000);
+
+
+        //read everything in previous version format and put it into our types.  Assumes we're
+        //using a test system, and it's not a huge amount of data, otherwise we'll overflow.
+
+        AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache, 1000 )
+                                     .doOnNext( new Action1<AllEntitiesInSystemObservable.ApplicationEntityGroup>() {
+                                         @Override
+                                         public void call(
+                                                 final AllEntitiesInSystemObservable.ApplicationEntityGroup entity ) {
+
+                                             //add all versions from history to our comparison
+                                             for ( final Id id : entity.entityIds ) {
+
+                                                 CollectionScope scope = CpNamingUtils
+                                                         .getCollectionScopeNameFromEntityType(
+                                                                 entity.applicationScope.getApplication(),
+                                                                 id.getType() );
+
+                                                 final Iterator<MvccEntity> versions = v1Strategy
+                                                         .loadDescendingHistory( scope, id, UUIDGenerator.newTimeUUID(),
+                                                                 100 );
+
+                                                 while ( versions.hasNext() ) {
+
+                                                     final MvccEntity mvccEntity = versions.next();
+
+                                                     savedEntities.add( mvccEntity );
+
+                                                     createdEntityIds.remove( mvccEntity.getId() );
+
+                                                     entityIds.add( id );
+                                                 }
+                                             }
+                                         }
+                                     } ).toBlocking().lastOrDefault( null );
+
+        assertEquals( "Newly saved entities encountered", 0, createdEntityIds.size() );
+        assertTrue( "Saved new entities", savedEntities.size() > 0 );
+
+        //perform the migration
+        entityDataMigration.migrate( progressObserver );
+
+        assertFalse( "Progress observer should not have failed", progressObserver.getFailed() );
+        assertTrue( "Progress observer should have update messages", progressObserver.getUpdates().size() > 0 );
+
+
+        //write the status and version, then invalidate the cache so it appears
+        migrationInfoSerialization.setStatusCode( DataMigrationManagerImpl.StatusCode.COMPLETE.status );
+        migrationInfoSerialization.setVersion( entityDataMigration.getVersion() );
+        dataMigrationManager.invalidate();
+
+        assertEquals( "New version saved, and we should get new implementation", entityDataMigration.getVersion(),
+                dataMigrationManager.getCurrentVersion() );
+
+
+        //now visit all entities in the system again, load them from v2, and ensure they're the same
+        AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache, 1000 )
+                                     .doOnNext( new Action1<AllEntitiesInSystemObservable.ApplicationEntityGroup>() {
+                                                    @Override
+                                                    public void call(
+                                                            final AllEntitiesInSystemObservable
+                                                                    .ApplicationEntityGroup entity ) {
+                                                        //add all versions from history to our comparison
+                                                        for ( final Id id : entity.entityIds ) {
+
+                                                            CollectionScope scope = CpNamingUtils
+                                                                    .getCollectionScopeNameFromEntityType(
+                                                                            entity.applicationScope.getApplication(),
+                                                                            id.getType() );
+
+                                                            final Iterator<MvccEntity> versions = v2Strategy
+                                                                    .loadDescendingHistory( scope, id,
+                                                                            UUIDGenerator.newTimeUUID(), 100 );
+
+                                                            while ( versions.hasNext() ) {
+
+                                                                final MvccEntity mvccEntity = versions.next();
+
+                                                                savedEntities.remove( mvccEntity );
+                                                            }
+                                                        }
+                                                    }
+                                                }
+
+
+                                              ).toBlocking().lastOrDefault( null );
+
+
+        assertEquals( "All entities migrated", 0, savedEntities.size() );
+
+
+        //now visit all entities in the system again, and load them from the EM, ensure we see everything we did in the v1 traversal
+        AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache, 1000 )
+                                     .doOnNext( new Action1<AllEntitiesInSystemObservable.ApplicationEntityGroup>() {
+                                                    @Override
+                                                    public void call(
+                                                            final AllEntitiesInSystemObservable
+                                                                    .ApplicationEntityGroup entity ) {
+
+                                                        final EntityManager em = emf.getEntityManager( entity.applicationScope.getApplication().getUuid() );
+
+                                                        //add all versions from history to our comparison
+                                                        for ( final Id id : entity.entityIds ) {
+
+
+                                                            try {
+                                                                final Entity emEntity = em.get( SimpleEntityRef.fromId( id ) );
+
+                                                                if(emEntity != null){
+                                                                    entityIds.remove( id );
+                                                                }
+                                                            }
+                                                            catch ( Exception e ) {
+                                                                throw new RuntimeException("Error loading entity", e);
+                                                            }
+                                                        }
+                                                    }
+                                                }
+
+
+                                              ).toBlocking().lastOrDefault( null );
+
+
+        assertEquals("All entities could be loaded by the entity manager", 0, entityIds.size());
+
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e32d5212/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigrationIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigrationIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigrationIT.java
index 88c02cd..51ea052 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigrationIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigrationIT.java
@@ -42,7 +42,6 @@ import org.apache.usergrid.persistence.model.entity.Id;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
 import com.google.inject.Injector;
-import com.netflix.astyanax.Keyspace;
 
 import rx.functions.Action1;
 
@@ -55,7 +54,6 @@ public class GraphShardVersionMigrationIT extends AbstractCoreIT {
 
     private Injector injector;
     private GraphShardVersionMigration graphShardVersionMigration;
-    private Keyspace keyspace;
     private ManagerCache managerCache;
     private DataMigrationManager dataMigrationManager;
     private MigrationInfoSerialization migrationInfoSerialization;
@@ -65,7 +63,6 @@ public class GraphShardVersionMigrationIT extends AbstractCoreIT {
     public void setup() {
         injector = CpSetup.getInjector();
         graphShardVersionMigration = injector.getInstance( GraphShardVersionMigration.class );
-        keyspace = injector.getInstance( Keyspace.class );
         managerCache = injector.getInstance( ManagerCache.class );
         dataMigrationManager = injector.getInstance( DataMigrationManager.class );
         migrationInfoSerialization = injector.getInstance( MigrationInfoSerialization.class );
@@ -75,15 +72,12 @@ public class GraphShardVersionMigrationIT extends AbstractCoreIT {
     @Test
     public void testIdMapping() throws Throwable {
 
-        assertEquals("version 2 expected", 2, graphShardVersionMigration.getVersion());
+        assertEquals( "version 2 expected", 2, graphShardVersionMigration.getVersion() );
 
         /**
          * Reset to our version -1 and start the migration
          */
-        dataMigrationManager.resetToVersion( graphShardVersionMigration.getVersion()-1 );
-
-
-
+        dataMigrationManager.resetToVersion( graphShardVersionMigration.getVersion() - 1 );
 
 
         final EntityManager newAppEm = app.getEntityManager();
@@ -111,15 +105,16 @@ public class GraphShardVersionMigrationIT extends AbstractCoreIT {
 
         //read everything in previous version format and put it into our types.
 
-        AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache, 1000)
+        AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache, 1000 )
                                      .doOnNext( new Action1<AllEntitiesInSystemObservable.ApplicationEntityGroup>() {
                                          @Override
-                                         public void call( final AllEntitiesInSystemObservable.ApplicationEntityGroup entity ) {
+                                         public void call(
+                                                 final AllEntitiesInSystemObservable.ApplicationEntityGroup entity ) {
 
                                              final GraphManager gm =
                                                      managerCache.getGraphManager( entity.applicationScope );
 
-                                             for(final Id id: entity.entityIds) {
+                                             for ( final Id id : entity.entityIds ) {
                                                  /**
                                                   * Get our edge types from the source
                                                   */
@@ -135,8 +130,7 @@ public class GraphShardVersionMigrationIT extends AbstractCoreIT {
                                                  /**
                                                   * Get the edge types to the target
                                                   */
-                                                 gm.getEdgeTypesToTarget( new SimpleSearchEdgeType( id,
-                                                         null, null ) )
+                                                 gm.getEdgeTypesToTarget( new SimpleSearchEdgeType( id, null, null ) )
                                                    .doOnNext( new Action1<String>() {
                                                        @Override
                                                        public void call( final String s ) {
@@ -153,7 +147,7 @@ public class GraphShardVersionMigrationIT extends AbstractCoreIT {
         //perform the migration
         graphShardVersionMigration.migrate( progressObserver );
 
-        assertEquals("Newly saved entities encounterd", 0, allEntities.size());
+        assertEquals( "Newly saved entities encounterd", 0, allEntities.size() );
         assertFalse( "Progress observer should not have failed", progressObserver.getFailed() );
         assertTrue( "Progress observer should have update messages", progressObserver.getUpdates().size() > 0 );
 
@@ -163,70 +157,57 @@ public class GraphShardVersionMigrationIT extends AbstractCoreIT {
         migrationInfoSerialization.setVersion( graphShardVersionMigration.getVersion() );
         dataMigrationManager.invalidate();
 
-        assertEquals("New version saved, and we should get new implementation", graphShardVersionMigration.getVersion(), dataMigrationManager.getCurrentVersion());
+        assertEquals( "New version saved, and we should get new implementation",
+                graphShardVersionMigration.getVersion(), dataMigrationManager.getCurrentVersion() );
 
 
         //now visit all nodes in the system and remove their types from the multi maps, it should be empty at the end
         AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache, 1000 )
                                      .doOnNext( new Action1<AllEntitiesInSystemObservable.ApplicationEntityGroup>() {
-                                         @Override
-                                         public void call(
-                                                 final AllEntitiesInSystemObservable.ApplicationEntityGroup entity ) {
-
-                                             final GraphManager gm =
-                                                     managerCache.getGraphManager( entity.applicationScope );
-
-                                             for ( final Id id : entity.entityIds ) {
-                                                 /**
-                                                  * Get our edge types from the source
-                                                  */
-                                                 gm.getEdgeTypesFromSource(
-                                                         new SimpleSearchEdgeType( id, null, null ) )
-                                                   .doOnNext( new Action1<String>() {
-                                                       @Override
-                                                       public void call( final String s ) {
-                                                           sourceTypes.remove( id, s );
-                                                       }
-                                                   } ).toBlocking().lastOrDefault( null );
-
-
-                                                 /**
-                                                  * Get the edge types to the target
-                                                  */
-                                                 gm.getEdgeTypesToTarget(
-                                                         new SimpleSearchEdgeType( id, null, null ) )
-                                                   .doOnNext( new Action1<String>() {
-                                                       @Override
-                                                       public void call( final String s ) {
-                                                           targetTypes.remove( id, s );
-                                                       }
-                                                   } ).toBlocking().lastOrDefault( null );
-                                             }
-                                             }
-                                         }
-
-
-                                         ).
-
-
-                                         toBlocking()
-
-
-                                         .
-
-
-                                         lastOrDefault( null );
-
-
-                                         assertEquals( "All source types migrated",0,sourceTypes.size( )
-
-
-                                         );
-
-
-                                         assertEquals( "All target types migrated",0,targetTypes.size( )
-
-
-                                         );
-                                     }
+                                                    @Override
+                                                    public void call(
+                                                            final AllEntitiesInSystemObservable
+                                                                    .ApplicationEntityGroup entity ) {
+
+                                                        final GraphManager gm =
+                                                                managerCache.getGraphManager( entity.applicationScope );
+
+                                                        for ( final Id id : entity.entityIds ) {
+                                                            /**
+                                                             * Get our edge types from the source
+                                                             */
+                                                            gm.getEdgeTypesFromSource(
+                                                                    new SimpleSearchEdgeType( id, null, null ) )
+                                                              .doOnNext( new Action1<String>() {
+                                                                  @Override
+                                                                  public void call( final String s ) {
+                                                                      sourceTypes.remove( id, s );
+                                                                  }
+                                                              } ).toBlocking().lastOrDefault( null );
+
+
+                                                            /**
+                                                             * Get the edge types to the target
+                                                             */
+                                                            gm.getEdgeTypesToTarget(
+                                                                    new SimpleSearchEdgeType( id, null, null ) )
+                                                              .doOnNext( new Action1<String>() {
+                                                                  @Override
+                                                                  public void call( final String s ) {
+                                                                      targetTypes.remove( id, s );
+                                                                  }
+                                                              } ).toBlocking().lastOrDefault( null );
+                                                        }
+                                                    }
+                                                }
+
+
+                                              ).toBlocking().lastOrDefault( null );
+
+
+        assertEquals( "All source types migrated", 0, sourceTypes.size() );
+
+
+        assertEquals( "All target types migrated", 0, targetTypes.size() );
     }
+}


[2/3] incubator-usergrid git commit: Refactored generation of collection scope to be re-usable with CpNamingUtils for consistency across modules

Posted by to...@apache.org.
Refactored generation of collection scope to be re-usable with CpNamingUtils for consistency across modules


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/364a6ea6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/364a6ea6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/364a6ea6

Branch: refs/heads/USERGRID-250-buffer-size-fix
Commit: 364a6ea6737194a1dd266b6d0e89abb4a40ddf96
Parents: 9ed7964
Author: Todd Nine <tn...@apigee.com>
Authored: Thu Nov 20 14:24:03 2014 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Thu Nov 20 14:24:03 2014 -0700

----------------------------------------------------------------------
 .../corepersistence/CpEntityManager.java        | 44 ++++++--------------
 .../corepersistence/CpEntityManagerFactory.java |  2 +-
 .../corepersistence/CpRelationManager.java      | 35 ++++------------
 .../corepersistence/util/CpNamingUtils.java     | 19 +++++++++
 4 files changed, 41 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/364a6ea6/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index e2f67e8..e6b8bce 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -131,6 +131,7 @@ import static me.prettyprint.hector.api.factory.HFactory.createCounterSliceQuery
 import static me.prettyprint.hector.api.factory.HFactory.createMutator;
 import static org.apache.commons.lang.StringUtils.capitalize;
 import static org.apache.commons.lang.StringUtils.isBlank;
+import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getCollectionScopeNameFromEntityType;
 import static org.apache.usergrid.persistence.Schema.COLLECTION_ROLES;
 import static org.apache.usergrid.persistence.Schema.COLLECTION_USERS;
 import static org.apache.usergrid.persistence.Schema.DICTIONARY_PERMISSIONS;
@@ -372,13 +373,9 @@ public class CpEntityManager implements EntityManager {
         }
 
         Id id = new SimpleId( entityRef.getUuid(), entityRef.getType() );
-        String collectionName = CpNamingUtils.getCollectionScopeNameFromEntityType( entityRef.getType() );
 
-        CollectionScope collectionScope =
-                new CollectionScopeImpl( getApplicationScope().getApplication(), getApplicationScope().getApplication(),
-                        collectionName );
+        CollectionScope collectionScope = getCollectionScopeNameFromEntityType(applicationScope.getApplication(),  entityRef.getType());
 
-        EntityCollectionManager ecm = managerCache.getEntityCollectionManager( collectionScope );
 
         //        if ( !UUIDUtils.isTimeBased( id.getUuid() ) ) {
         //            throw new IllegalArgumentException(
@@ -457,13 +454,10 @@ public class CpEntityManager implements EntityManager {
         String type = Schema.getDefaultSchema().getEntityType( entityClass );
 
         Id id = new SimpleId( entityId, type );
-        String collectionName = CpNamingUtils.getCollectionScopeNameFromEntityType( type );
 
-        CollectionScope collectionScope =
-                new CollectionScopeImpl( getApplicationScope().getApplication(), getApplicationScope().getApplication(),
-                        collectionName );
 
-        EntityCollectionManager ecm = managerCache.getEntityCollectionManager( collectionScope );
+        CollectionScope collectionScope = getCollectionScopeNameFromEntityType(applicationScope.getApplication(),  type);
+
 
         //        if ( !UUIDUtils.isTimeBased( id.getUuid() ) ) {
         //            throw new IllegalArgumentException(
@@ -522,9 +516,8 @@ public class CpEntityManager implements EntityManager {
     public void update( Entity entity ) throws Exception {
 
         // first, update entity index in its own collection scope
-        CollectionScope collectionScope =
-                new CollectionScopeImpl( getApplicationScope().getApplication(), getApplicationScope().getApplication(),
-                        CpNamingUtils.getCollectionScopeNameFromEntityType( entity.getType() ) );
+
+        CollectionScope collectionScope = getCollectionScopeNameFromEntityType(applicationScope.getApplication(),  entity.getType());
         EntityCollectionManager ecm = managerCache.getEntityCollectionManager( collectionScope );
 
         Id entityId = new SimpleId( entity.getUuid(), entity.getType() );
@@ -590,9 +583,7 @@ public class CpEntityManager implements EntityManager {
 
     private Observable deleteAsync( EntityRef entityRef ) throws Exception {
 
-        CollectionScope collectionScope =
-                new CollectionScopeImpl( getApplicationScope().getApplication(), getApplicationScope().getApplication(),
-                        CpNamingUtils.getCollectionScopeNameFromEntityType( entityRef.getType() ) );
+        CollectionScope collectionScope =getCollectionScopeNameFromEntityType(applicationScope.getApplication(),  entityRef.getType()  );
 
         EntityCollectionManager ecm = managerCache.getEntityCollectionManager( collectionScope );
 
@@ -639,7 +630,7 @@ public class CpEntityManager implements EntityManager {
 
             // deindex from default index scope
             IndexScope defaultIndexScope = new IndexScopeImpl( getApplicationScope().getApplication(),
-                    CpNamingUtils.getCollectionScopeNameFromEntityType( entityRef.getType() ) );
+                    getCollectionScopeNameFromEntityType( entityRef.getType() ) );
 
             batch.deindex( defaultIndexScope, entity );
 
@@ -1010,15 +1001,10 @@ public class CpEntityManager implements EntityManager {
 
     @Override
     public void deleteProperty( EntityRef entityRef, String propertyName ) throws Exception {
-
-        String collectionName = CpNamingUtils.getCollectionScopeNameFromEntityType( entityRef.getType() );
-
-        CollectionScope collectionScope =
-                new CollectionScopeImpl( getApplicationScope().getApplication(), getApplicationScope().getApplication(),
-                        collectionName );
+        CollectionScope collectionScope =  getCollectionScopeNameFromEntityType(getApplicationScope().getApplication(), entityRef.getType());
 
         IndexScope defaultIndexScope = new IndexScopeImpl( getApplicationScope().getApplication(),
-                CpNamingUtils.getCollectionScopeNameFromEntityType( entityRef.getType() ) );
+                getCollectionScopeNameFromEntityType( entityRef.getType() ) );
 
         EntityCollectionManager ecm = managerCache.getEntityCollectionManager( collectionScope );
         EntityIndex ei = managerCache.getEntityIndex( getApplicationScope() );
@@ -2187,10 +2173,7 @@ public class CpEntityManager implements EntityManager {
     private Id getIdForUniqueEntityField( final String collectionName, final String propertyName,
                                           final Object propertyValue ) {
 
-        CollectionScope collectionScope =
-                new CollectionScopeImpl( applicationScope.getApplication(), applicationScope.getApplication(),
-                        CpNamingUtils.getCollectionScopeNameFromEntityType( collectionName ) );
-
+        CollectionScope collectionScope = getCollectionScopeNameFromEntityType(applicationScope.getApplication(), collectionName);
 
         final EntityCollectionManager ecm = managerCache.getEntityCollectionManager( collectionScope );
 
@@ -2514,9 +2497,8 @@ public class CpEntityManager implements EntityManager {
         org.apache.usergrid.persistence.model.entity.Entity cpEntity = entityToCpEntity( entity, importId );
 
         // prepare to write and index Core Persistence Entity into default scope
-        CollectionScope collectionScope =
-                new CollectionScopeImpl( applicationScope.getApplication(), applicationScope.getApplication(),
-                        CpNamingUtils.getCollectionScopeNameFromEntityType( eType ) );
+        CollectionScope collectionScope = getCollectionScopeNameFromEntityType(applicationScope.getApplication(), eType);
+
         EntityCollectionManager ecm = managerCache.getEntityCollectionManager( collectionScope );
 
         if ( logger.isDebugEnabled() ) {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/364a6ea6/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
index e07bb00..82e80a5 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
@@ -541,7 +541,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
     public long performEntityCount() {
         //TODO, this really needs to be a task that writes this data somewhere since this will get
         //progressively slower as the system expands
-        return AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache ).longCount().toBlocking().last();
+        return AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache, 1000 ).longCount().toBlocking().last();
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/364a6ea6/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index 8514504..dcb4ba1 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -120,6 +120,7 @@ import rx.functions.Func1;
 import static java.util.Arrays.asList;
 
 import static me.prettyprint.hector.api.factory.HFactory.createMutator;
+import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getCollectionScopeNameFromEntityType;
 import static org.apache.usergrid.persistence.Schema.COLLECTION_ROLES;
 import static org.apache.usergrid.persistence.Schema.DICTIONARY_CONNECTED_ENTITIES;
 import static org.apache.usergrid.persistence.Schema.DICTIONARY_CONNECTED_TYPES;
@@ -219,10 +220,7 @@ public class CpRelationManager implements RelationManager {
         this.indexBucketLocator = indexBucketLocator; // TODO: this also
 
         // load the Core Persistence version of the head entity as well
-        this.headEntityScope = new CollectionScopeImpl(
-            this.applicationScope.getApplication(),
-            this.applicationScope.getApplication(),
-            CpNamingUtils.getCollectionScopeNameFromEntityType( headEntity.getType() ) );
+        this.headEntityScope = getCollectionScopeNameFromEntityType(applicationScope.getApplication(), headEntity.getType());
 
         if ( logger.isDebugEnabled() ) {
             logger.debug( "Loading head entity {}:{} from scope\n   app {}\n   owner {}\n   name {}",
@@ -606,10 +604,7 @@ public class CpRelationManager implements RelationManager {
     public Entity addToCollection( String collName, EntityRef itemRef, boolean connectBack ) 
             throws Exception {
 
-        CollectionScope memberScope = new CollectionScopeImpl( 
-            applicationScope.getApplication(),
-            applicationScope.getApplication(),
-            CpNamingUtils.getCollectionScopeNameFromEntityType( itemRef.getType() ) );
+        CollectionScope memberScope = getCollectionScopeNameFromEntityType(applicationScope.getApplication(), itemRef.getType());
 
         Id entityId = new SimpleId( itemRef.getUuid(), itemRef.getType() ); 
         org.apache.usergrid.persistence.model.entity.Entity memberEntity = 
@@ -642,10 +637,7 @@ public class CpRelationManager implements RelationManager {
         }
 
         // load the new member entity to be added to the collection from its default scope
-        CollectionScope memberScope = new CollectionScopeImpl(
-                applicationScope.getApplication(),
-                applicationScope.getApplication(),
-                CpNamingUtils.getCollectionScopeNameFromEntityType( itemRef.getType() ) );
+        CollectionScope memberScope = getCollectionScopeNameFromEntityType(applicationScope.getApplication(), itemRef.getType());
 
         //TODO, this double load should disappear once events are in
         Id entityId = new SimpleId( itemRef.getUuid(), itemRef.getType() );
@@ -797,11 +789,7 @@ public class CpRelationManager implements RelationManager {
         }
 
         // load the entity to be removed to the collection
-        CollectionScope memberScope = new CollectionScopeImpl(
-                this.applicationScope.getApplication(),
-                this.applicationScope.getApplication(),
-                CpNamingUtils.getCollectionScopeNameFromEntityType( itemRef.getType() ) );
-        EntityCollectionManager memberMgr = managerCache.getEntityCollectionManager( memberScope );
+        CollectionScope memberScope = getCollectionScopeNameFromEntityType(applicationScope.getApplication(), itemRef.getType());
 
         if ( logger.isDebugEnabled() ) {
             logger.debug( "Loading entity to remove from collection "
@@ -1017,10 +1005,7 @@ public class CpRelationManager implements RelationManager {
 
         ConnectionRefImpl connection = new ConnectionRefImpl( headEntity, connectionType, connectedEntityRef );
 
-        CollectionScope targetScope = new CollectionScopeImpl(
-            applicationScope.getApplication(),
-            applicationScope.getApplication(),
-            CpNamingUtils.getCollectionScopeNameFromEntityType( connectedEntityRef.getType() ) );
+        CollectionScope targetScope = getCollectionScopeNameFromEntityType(applicationScope.getApplication(), connectedEntityRef.getType());
 
         if ( logger.isDebugEnabled() ) {
             logger.debug("createConnection(): "
@@ -1249,12 +1234,8 @@ public class CpRelationManager implements RelationManager {
 
         String connectionType = connectionRef.getConnectedEntity().getConnectionType();
 
-        CollectionScope targetScope = new CollectionScopeImpl(
-                applicationScope.getApplication(),
-                applicationScope.getApplication(),
-                CpNamingUtils.getCollectionScopeNameFromEntityType( connectedEntityRef.getType()) );
-
-        EntityCollectionManager targetEcm = managerCache.getEntityCollectionManager( targetScope );
+        CollectionScope targetScope = getCollectionScopeNameFromEntityType( applicationScope.getApplication(),
+                connectedEntityRef.getType() );
 
         if ( logger.isDebugEnabled() ) {
             logger.debug( "Deleting connection '{}' from source {}:{} \n   to target {}:{}",

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/364a6ea6/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
index 6fb35a3..684b6e0 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
@@ -22,6 +22,8 @@ package org.apache.usergrid.corepersistence.util;
 import java.util.UUID;
 
 import org.apache.usergrid.persistence.Schema;
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.impl.CollectionScopeImpl;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
 import org.apache.usergrid.persistence.entities.Application;
@@ -70,6 +72,23 @@ public class CpNamingUtils {
     public static String TYPES_BY_UUID_MAP = "zzz_typesbyuuid_zzz";
 
 
+    /**
+     * Generate a collection scope for a collection within the application's Id for the given type
+     * @param applicationId The applicationId that owns this entity
+     * @param type The type in the collection
+     * @return The collectionScope
+     */
+    public static CollectionScope getCollectionScopeNameFromEntityType(final Id applicationId, final String type){
+       return
+                       new CollectionScopeImpl( applicationId, applicationId,
+                               getCollectionScopeNameFromEntityType( type ) );
+    }
+
+    /**
+     * Get the collection name from the entity/id type
+     * @param type
+     * @return
+     */
     public static String getCollectionScopeNameFromEntityType( String type ) {
         String csn = EDGE_COLL_SUFFIX + Schema.defaultCollectionName( type );
         return csn.toLowerCase();