You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/02/16 19:08:23 UTC

incubator-usergrid git commit: adding specific migration types

Repository: incubator-usergrid
Updated Branches:
  refs/heads/USERGRID-365 e52246952 -> 79ec9787c


adding specific migration types


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/79ec9787
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/79ec9787
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/79ec9787

Branch: refs/heads/USERGRID-365
Commit: 79ec9787c185b3e1a43aba5f456b5568b5735235
Parents: e522469
Author: Shawn Feldman <sf...@apache.org>
Authored: Mon Feb 16 10:07:55 2015 -0800
Committer: Shawn Feldman <sf...@apache.org>
Committed: Mon Feb 16 10:07:55 2015 -0800

----------------------------------------------------------------------
 .../usergrid/corepersistence/GuiceModule.java   |   5 +-
 .../migration/EntityTypeMappingMigration.java   |  48 +++--
 .../impl/AllEntitiesInSystemObservableImpl.java | 112 +++++------
 .../rx/impl/ApplicationObservableImpl.java      |  12 ++
 .../migration/EntityDataMigrationIT.java        |  24 +--
 .../org/apache/usergrid/persistence/GeoIT.java  |   5 +-
 .../impl/MvccEntityDataMigrationImpl.java       |  97 +++++-----
 .../serialization/impl/SerializationModule.java |   9 +-
 .../persistence/core/guice/CommonModule.java    |   9 +-
 .../data/ApplicationDataMigration.java          |  38 ++++
 .../migration/data/CollectionDataMigration.java |  36 ++++
 .../core/migration/data/DataMigration.java      |  12 +-
 .../data/DataMigrationManagerImpl.java          | 193 ++++++++++++-------
 .../core/rx/AllEntitiesInSystemObservable.java  |   6 +-
 .../core/rx/ApplicationObservable.java          |   6 +
 .../core/guice/MaxMigrationModule.java          |   3 +-
 .../core/guice/MaxMigrationVersion.java         |   9 +-
 .../data/DataMigrationManagerImplTest.java      |  70 ++++---
 .../core/rx/ApplicationsTestObservable.java     |   6 +
 .../persistence/graph/guice/GraphModule.java    |   9 +-
 .../impl/EdgeDataMigrationImpl.java             | 122 ++++++------
 21 files changed, 495 insertions(+), 336 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/79ec9787/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java
index 4c1e5c3..7098572 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java
@@ -31,6 +31,7 @@ import org.apache.usergrid.persistence.collection.event.EntityVersionCreated;
 import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted;
 import org.apache.usergrid.persistence.collection.guice.CollectionModule;
 import org.apache.usergrid.persistence.core.guice.CommonModule;
+import org.apache.usergrid.persistence.core.migration.data.CollectionDataMigration;
 import org.apache.usergrid.persistence.core.migration.data.DataMigration;
 import org.apache.usergrid.persistence.core.rx.AllEntitiesInSystemObservable;
 import org.apache.usergrid.persistence.core.rx.ApplicationObservable;
@@ -79,8 +80,8 @@ public class GuiceModule  extends AbstractModule {
         bind(AllEntitiesInSystemObservable.class).to( AllEntitiesInSystemObservableImpl.class );
         bind(ApplicationObservable.class).to( ApplicationObservableImpl.class );
 
-        Multibinder<DataMigration> dataMigrationMultibinder =
-                Multibinder.newSetBinder( binder(), DataMigration.class );
+        Multibinder<CollectionDataMigration> dataMigrationMultibinder =
+                Multibinder.newSetBinder( binder(), CollectionDataMigration.class );
         dataMigrationMultibinder.addBinding().to( EntityTypeMappingMigration.class );
 
         Multibinder<EntityDeleted> entityBinder =

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/79ec9787/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java
index 81503d5..4dbc373 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java
@@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.usergrid.corepersistence.ManagerCache;
 import org.apache.usergrid.corepersistence.util.CpNamingUtils;
+import org.apache.usergrid.persistence.core.migration.data.CollectionDataMigration;
 import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
 import org.apache.usergrid.persistence.core.migration.data.DataMigration;
 import org.apache.usergrid.persistence.core.rx.AllEntitiesInSystemObservable;
@@ -46,7 +47,7 @@ import rx.schedulers.Schedulers;
 /**
  * Migration to ensure that our entity id is written into our map data
  */
-public class EntityTypeMappingMigration implements DataMigration {
+public class EntityTypeMappingMigration implements CollectionDataMigration {
 
     private final ManagerCache managerCache;
     private final AllEntitiesInSystemObservable allEntitiesInSystemObservable;
@@ -60,29 +61,34 @@ public class EntityTypeMappingMigration implements DataMigration {
 
 
     @Override
-    public Observable migrate(final ApplicationEntityGroup applicationEntityGroup, final ProgressObserver observer) throws Throwable {
+    public Observable migrate(final Observable<ApplicationEntityGroup> applicationEntityGroupObservable, final ProgressObserver observer) throws Throwable {
 
         final AtomicLong atomicLong = new AtomicLong();
 
-        final MapScope ms = CpNamingUtils.getEntityTypeMapScope(applicationEntityGroup.applicationScope.getApplication());
-
-        final MapManager mapManager = managerCache.getMapManager(ms);
-        return Observable.from(applicationEntityGroup.entityIds)
-            .subscribeOn(Schedulers.io())
-            .map(new Func1<EntityIdScope, Long>() {
-                @Override
-                public Long call(EntityIdScope idScope) {
-                    final UUID entityUuid = idScope.getId().getUuid();
-                    final String entityType = idScope.getId().getType();
-
-                    mapManager.putString(entityUuid.toString(), entityType);
-
-                    if (atomicLong.incrementAndGet() % 100 == 0) {
-                        updateStatus(atomicLong, observer);
-                    }
-                    return atomicLong.get();
-                }
-            });
+        return applicationEntityGroupObservable.flatMap(new Func1<ApplicationEntityGroup, Observable<Long>>() {
+            @Override
+            public Observable call(final ApplicationEntityGroup applicationEntityGroup) {
+                final MapScope ms = CpNamingUtils.getEntityTypeMapScope(applicationEntityGroup.applicationScope.getApplication());
+
+                final MapManager mapManager = managerCache.getMapManager(ms);
+                return Observable.from(applicationEntityGroup.entityIds)
+                    .subscribeOn(Schedulers.io())
+                    .map(new Func1<EntityIdScope, Long>() {
+                        @Override
+                        public Long call(EntityIdScope idScope) {
+                            final UUID entityUuid = idScope.getId().getUuid();
+                            final String entityType = idScope.getId().getType();
+
+                            mapManager.putString(entityUuid.toString(), entityType);
+
+                            if (atomicLong.incrementAndGet() % 100 == 0) {
+                                updateStatus(atomicLong, observer);
+                            }
+                            return atomicLong.get();
+                        }
+                    });
+            }
+        });
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/79ec9787/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntitiesInSystemObservableImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntitiesInSystemObservableImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntitiesInSystemObservableImpl.java
index a4fbc5d..55667cb 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntitiesInSystemObservableImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntitiesInSystemObservableImpl.java
@@ -53,71 +53,75 @@ public class AllEntitiesInSystemObservableImpl implements AllEntitiesInSystemObs
     private final TargetIdObservable targetIdObservable;
 
     @Inject
-    public AllEntitiesInSystemObservableImpl(ApplicationObservable applicationObservable, GraphManagerFactory graphManagerFactory, TargetIdObservable targetIdObservable){
+    public AllEntitiesInSystemObservableImpl(ApplicationObservable applicationObservable, GraphManagerFactory graphManagerFactory, TargetIdObservable targetIdObservable) {
 
         this.applicationObservable = applicationObservable;
         this.graphManagerFactory = graphManagerFactory;
         this.targetIdObservable = targetIdObservable;
     }
 
-    public  Observable<ApplicationEntityGroup<CollectionScope>> getAllEntitiesInSystem(final int bufferSize) {
-        return getAllEntitiesInSystem(applicationObservable.getAllApplicationIds( ),bufferSize);
+    public Observable<ApplicationEntityGroup<CollectionScope>> getAllEntitiesInSystem(final int bufferSize) {
+        return getAllEntitiesInSystem(applicationObservable.getAllApplicationIds().map(new Func1<Id, ApplicationScope>() {
+            @Override
+            public ApplicationScope call(Id id) {
+                return new ApplicationScopeImpl(id);
+            }
+        }), bufferSize);
 
     }
 
-    public  Observable<ApplicationEntityGroup<CollectionScope>> getAllEntitiesInSystem(final Observable<Id> appIdsObservable, final int bufferSize) {
+    public Observable<ApplicationEntityGroup<CollectionScope>> getAllEntitiesInSystem(final Observable<ApplicationScope> appIdsObservable, final int bufferSize) {
         //traverse all nodes in the graph, load all source edges from them, then re-save the meta data
-        return appIdsObservable.flatMap(new Func1<Id, Observable<ApplicationEntityGroup<CollectionScope>>>() {
-            @Override
-            public Observable<ApplicationEntityGroup<CollectionScope>> call(final Id applicationId) {
-
-                //set up our application scope and graph manager
-                final ApplicationScope applicationScope = new ApplicationScopeImpl(
-                    applicationId);
-
-                final GraphManager gm = graphManagerFactory.createEdgeManager(applicationScope);
-
-                //load all nodes that are targets of our application node.  I.E.
-                // entities that have been saved
-                final Observable<Id> entityNodes =
-                    targetIdObservable.getTargetNodes(gm, applicationId);
-
-
-                //get scope here
-
-
-                //emit Scope + ID
-
-                //create our application node to emit since it's an entity as well
-                final Observable<Id> applicationNode = Observable.just(applicationId);
-
-                //merge both the specified application node and the entity node
-                // so they all get used
-                return Observable
-                    .merge(applicationNode, entityNodes)
-                    .buffer(bufferSize)
-                    .map(new Func1<List<Id>, List<EntityIdScope<CollectionScope>>>() {
-                        @Override
-                        public List<EntityIdScope<CollectionScope>> call(List<Id> ids) {
-                            List<EntityIdScope<CollectionScope>> scopes = new ArrayList<>(ids.size());
-                            for (Id id : ids) {
-                                CollectionScope scope = CpNamingUtils.getCollectionScopeNameFromEntityType(applicationId, id.getType());
-                                EntityIdScope<CollectionScope> idScope = new EntityIdScope<>(id, scope);
-                                scopes.add(idScope);
-                            }
-                            return scopes;
-                        }
-                    })
-                    .map(new Func1<List<EntityIdScope<CollectionScope>>, ApplicationEntityGroup<CollectionScope>>() {
-                        @Override
-                        public ApplicationEntityGroup<CollectionScope> call(final List<EntityIdScope<CollectionScope>> scopes) {
-                            return new ApplicationEntityGroup<>(applicationScope, scopes);
-                        }
-                    });
-            }
-                                    } );
+        return appIdsObservable
+            .flatMap(new Func1<ApplicationScope, Observable<ApplicationEntityGroup<CollectionScope>>>() {
+                @Override
+                public Observable<ApplicationEntityGroup<CollectionScope>> call(final ApplicationScope applicationScope) {
+                    return getAllEntities(applicationScope, bufferSize);
+                }
+            });
     }
 
-
+    private Observable<ApplicationEntityGroup<CollectionScope>> getAllEntities(final ApplicationScope applicationScope, final int bufferSize) {
+        final GraphManager gm = graphManagerFactory.createEdgeManager(applicationScope);
+        final Id applicationId = applicationScope.getApplication();
+
+        //load all nodes that are targets of our application node.  I.E.
+        // entities that have been saved
+        final Observable<Id> entityNodes =
+            targetIdObservable.getTargetNodes(gm, applicationId);
+
+
+        //get scope here
+
+
+        //emit Scope + ID
+
+        //create our application node to emit since it's an entity as well
+        final Observable<Id> applicationNode = Observable.just(applicationId);
+
+        //merge both the specified application node and the entity node
+        // so they all get used
+        return Observable
+            .merge(applicationNode, entityNodes)
+            .buffer(bufferSize)
+            .map(new Func1<List<Id>, List<EntityIdScope<CollectionScope>>>() {
+                @Override
+                public List<EntityIdScope<CollectionScope>> call(List<Id> ids) {
+                    List<EntityIdScope<CollectionScope>> scopes = new ArrayList<>(ids.size());
+                    for (Id id : ids) {
+                        CollectionScope scope = CpNamingUtils.getCollectionScopeNameFromEntityType(applicationId, id.getType());
+                        EntityIdScope<CollectionScope> idScope = new EntityIdScope<>(id, scope);
+                        scopes.add(idScope);
+                    }
+                    return scopes;
+                }
+            })
+            .map(new Func1<List<EntityIdScope<CollectionScope>>, ApplicationEntityGroup<CollectionScope>>() {
+                @Override
+                public ApplicationEntityGroup<CollectionScope> call(final List<EntityIdScope<CollectionScope>> scopes) {
+                    return new ApplicationEntityGroup<>(applicationScope, scopes);
+                }
+            });
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/79ec9787/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/ApplicationObservableImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/ApplicationObservableImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/ApplicationObservableImpl.java
index 9a5e084..f4a1057 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/ApplicationObservableImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/ApplicationObservableImpl.java
@@ -26,6 +26,7 @@ import java.util.UUID;
 import com.google.inject.Inject;
 import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
 import org.apache.usergrid.persistence.core.rx.ApplicationObservable;
+import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
 import org.apache.usergrid.persistence.graph.GraphManagerFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -134,4 +135,15 @@ public class ApplicationObservableImpl implements ApplicationObservable {
 
         return Observable.merge( systemIds, appIds );
     }
+
+    @Override
+    public Observable<ApplicationScope> getAllApplicationScopes() {
+        return getAllApplicationIds().map(new Func1<Id, ApplicationScope>() {
+            @Override
+            public ApplicationScope call(Id id) {
+                ApplicationScope scope = new ApplicationScopeImpl(id);
+                return scope;
+            }
+        });
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/79ec9787/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityDataMigrationIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityDataMigrationIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityDataMigrationIT.java
index 2da1f37..4d87f8b 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityDataMigrationIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityDataMigrationIT.java
@@ -27,8 +27,9 @@ import java.util.Set;
 import org.apache.usergrid.persistence.collection.mvcc.MvccEntityMigrationStrategy;
 import org.apache.usergrid.persistence.collection.serialization.impl.MvccEntityDataMigrationImpl;
 import org.apache.usergrid.persistence.collection.serialization.impl.MvccEntitySerializationStrategyProxyV2Impl;
-import org.apache.usergrid.persistence.core.migration.data.DataMigration;
+import org.apache.usergrid.persistence.core.migration.data.*;
 import org.apache.usergrid.persistence.core.rx.AllEntitiesInSystemObservable;
+import org.apache.usergrid.persistence.core.rx.ApplicationObservable;
 import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
 import org.apache.usergrid.persistence.core.scope.EntityIdScope;
 import org.junit.Before;
@@ -47,9 +48,6 @@ import org.apache.usergrid.persistence.SimpleEntityRef;
 import org.apache.usergrid.persistence.collection.CollectionScope;
 import org.apache.usergrid.persistence.collection.MvccEntity;
 import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
-import org.apache.usergrid.persistence.core.migration.data.DataMigrationManager;
-import org.apache.usergrid.persistence.core.migration.data.DataMigrationManagerImpl;
-import org.apache.usergrid.persistence.core.migration.data.MigrationInfoSerialization;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 
@@ -72,7 +70,7 @@ public class EntityDataMigrationIT extends AbstractCoreIT {
     private Injector injector;
 
 
-    private DataMigration entityDataMigration;
+    private CollectionDataMigration entityDataMigration;
     private DataMigrationManager dataMigrationManager;
     private MigrationInfoSerialization migrationInfoSerialization;
     private MvccEntitySerializationStrategy v1Strategy;
@@ -89,6 +87,7 @@ public class EntityDataMigrationIT extends AbstractCoreIT {
 
             new MigrationTestRule( app, SpringResource.getInstance().getBean( Injector.class ) ,MvccEntityDataMigrationImpl.class  );
     private AllEntitiesInSystemObservable allEntitiesInSystemObservable;
+    private ApplicationObservable applicationObservable;
 
 
     @Before
@@ -101,6 +100,8 @@ public class EntityDataMigrationIT extends AbstractCoreIT {
         migrationInfoSerialization = injector.getInstance( MigrationInfoSerialization.class );
         MvccEntityMigrationStrategy strategy = injector.getInstance(Key.get(MvccEntityMigrationStrategy.class));
         allEntitiesInSystemObservable = injector.getInstance(AllEntitiesInSystemObservable.class);
+        applicationObservable = injector.getInstance(ApplicationObservable.class);
+
         v1Strategy = strategy.getMigration().from();
         v2Strategy = strategy.getMigration().to();
     }
@@ -177,18 +178,9 @@ public class EntityDataMigrationIT extends AbstractCoreIT {
         assertTrue( "Saved new entities", savedEntities.size() > 0 );
 
         //perform the migration
-        allEntitiesInSystemObservable.getAllEntitiesInSystem(  1000)
-            .doOnNext(new Action1<ApplicationEntityGroup>() {
-                @Override
-                public void call(ApplicationEntityGroup applicationEntityGroup) {
-                   try {
-                       entityDataMigration.migrate(applicationEntityGroup, progressObserver).toBlocking().last();
-                   }catch (Throwable e){
-                       throw new RuntimeException(e);
-                   }
-                }
-            }).toBlocking().last();
+        rx.Observable<ApplicationEntityGroup> oRx = allEntitiesInSystemObservable.getAllEntitiesInSystem(applicationObservable.getAllApplicationScopes(), 1000);
 
+        entityDataMigration.migrate(oRx, progressObserver).toBlocking().last();
 
         assertFalse( "Progress observer should not have failed", progressObserver.getFailed() );
         assertTrue( "Progress observer should have update messages", progressObserver.getUpdates().size() > 0 );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/79ec9787/stack/core/src/test/java/org/apache/usergrid/persistence/GeoIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/GeoIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/GeoIT.java
index a1ac4ff..ff15063 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/GeoIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/GeoIT.java
@@ -177,7 +177,7 @@ public class GeoIT extends AbstractCoreIT {
         Map<String, Object> restaurantProps = new LinkedHashMap<String, Object>();
         restaurantProps.put("name", "Brickhouse");
         restaurantProps.put("address", "426 Brannan Street");
-        restaurantProps.put("location", getLocation(37.779632, -122.395131));
+        restaurantProps.put("location", getLocation(37.776753, -122.407846));
 
         Entity restaurant = em.create("restaurant", restaurantProps);
         assertNotNull(restaurant);
@@ -193,6 +193,7 @@ public class GeoIT extends AbstractCoreIT {
 
         Entity user = em.create("user", userProperties);
         assertNotNull(user);
+        em.refreshIndex();
 
         //3. Create a connection between the user and the entity
         em.createConnection(user, "likes", restaurant);
@@ -200,7 +201,7 @@ public class GeoIT extends AbstractCoreIT {
         em.refreshIndex();
         //4. Test that the user is within 2000m of the entity
         Results emSearchResults = em.searchConnectedEntities(user,
-            Query.fromQL("location within 2000 of "
+            Query.fromQL("location within 5000 of "
                 + ((LinkedHashMap<String, Object>) userProperties.get("location")).get("latitude")
                 + ", " + ((LinkedHashMap<String, Object>)
                         userProperties.get("location")).get("longitude")).setConnectionType("likes"));

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/79ec9787/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntityDataMigrationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntityDataMigrationImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntityDataMigrationImpl.java
index bae24f4..7951383 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntityDataMigrationImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntityDataMigrationImpl.java
@@ -27,6 +27,7 @@ import org.apache.usergrid.persistence.collection.CollectionScope;
 import org.apache.usergrid.persistence.collection.MvccEntity;
 import org.apache.usergrid.persistence.collection.mvcc.MvccEntityMigrationStrategy;
 import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.core.migration.data.CollectionDataMigration;
 import org.apache.usergrid.persistence.core.migration.data.DataMigration;
 import org.apache.usergrid.persistence.core.migration.data.DataMigrationException;
 import org.apache.usergrid.persistence.core.migration.schema.MigrationStrategy;
@@ -48,7 +49,7 @@ import java.util.concurrent.atomic.AtomicLong;
 /**
  * Data migration strategy for entities
  */
-public class MvccEntityDataMigrationImpl implements DataMigration {
+public class MvccEntityDataMigrationImpl implements CollectionDataMigration {
 
     private final Keyspace keyspace;
     private final MvccEntityMigrationStrategy entityMigrationStrategy;
@@ -61,59 +62,65 @@ public class MvccEntityDataMigrationImpl implements DataMigration {
     }
 
     @Override
-    public Observable migrate(final ApplicationEntityGroup applicationEntityGroup, final DataMigration.ProgressObserver observer) {
+    public Observable migrate(final  Observable<ApplicationEntityGroup> applicationEntityGroupObservable, final DataMigration.ProgressObserver observer) {
         final AtomicLong atomicLong = new AtomicLong();
         final MutationBatch totalBatch = keyspace.prepareMutationBatch();
 
-        final List<EntityIdScope<CollectionScope>> entityIds = applicationEntityGroup.entityIds;
 
         final UUID now = UUIDGenerator.newTimeUUID();
 
-        //go through each entity in the system, and load it's entire
-        // history
-        return Observable.from(entityIds)
-            .subscribeOn(Schedulers.io())
-            .map(new Func1<EntityIdScope<CollectionScope>, Id>() {
-                @Override
-                public Id call(EntityIdScope<CollectionScope> idScope) {
-
-                    ApplicationScope applicationScope = applicationEntityGroup.applicationScope;
-                    MigrationStrategy.MigrationRelationship<MvccEntitySerializationStrategy> migration = entityMigrationStrategy.getMigration();
-
-                    if (idScope.getCollectionScope() instanceof CollectionScope) {
-                        CollectionScope currentScope =  idScope.getCollectionScope();
-                        //for each element in the history in the previous version,
-                        // copy it to the CF in v2
-                        Iterator<MvccEntity> allVersions = migration.from()
-                            .loadDescendingHistory(currentScope, idScope.getId(), now,
-                                1000);
-
-                        while (allVersions.hasNext()) {
-                            final MvccEntity version = allVersions.next();
-
-                            final MutationBatch versionBatch =
-                                migration.to().write(currentScope, version);
-
-                            totalBatch.mergeShallow(versionBatch);
-
-                            if (atomicLong.incrementAndGet() % 50 == 0) {
+        return applicationEntityGroupObservable.flatMap(new Func1<ApplicationEntityGroup, Observable<Id>>() {
+            @Override
+            public Observable call(final ApplicationEntityGroup applicationEntityGroup) {
+                final List<EntityIdScope<CollectionScope>> entityIds = applicationEntityGroup.entityIds;
+
+                //go through each entity in the system, and load it's entire
+                // history
+                return Observable.from(entityIds)
+                    .subscribeOn(Schedulers.io())
+                    .map(new Func1<EntityIdScope<CollectionScope>, Id>() {
+                        @Override
+                        public Id call(EntityIdScope<CollectionScope> idScope) {
+
+                            MigrationStrategy.MigrationRelationship<MvccEntitySerializationStrategy> migration = entityMigrationStrategy.getMigration();
+
+                            if (idScope.getCollectionScope() instanceof CollectionScope) {
+                                CollectionScope currentScope = idScope.getCollectionScope();
+                                //for each element in the history in the previous version,
+                                // copy it to the CF in v2
+                                Iterator<MvccEntity> allVersions = migration.from()
+                                    .loadDescendingHistory(currentScope, idScope.getId(), now,
+                                        1000);
+
+                                while (allVersions.hasNext()) {
+                                    final MvccEntity version = allVersions.next();
+
+                                    final MutationBatch versionBatch =
+                                        migration.to().write(currentScope, version);
+
+                                    totalBatch.mergeShallow(versionBatch);
+
+                                    if (atomicLong.incrementAndGet() % 50 == 0) {
+                                        executeBatch(totalBatch, observer, atomicLong);
+                                    }
+                                }
                                 executeBatch(totalBatch, observer, atomicLong);
                             }
+
+                            return idScope.getId();
                         }
-                        executeBatch(totalBatch, observer, atomicLong);
-                    }
-
-                    return idScope.getId();
-                }
-            })
-            .buffer(100)
-            .doOnNext(new Action1<List<Id>>() {
-                @Override
-                public void call(List<Id> ids) {
-                    executeBatch(totalBatch, observer, atomicLong);
-
-                }
-            });
+                    })
+                    .buffer(100)
+                    .doOnNext(new Action1<List<Id>>() {
+                        @Override
+                        public void call(List<Id> ids) {
+                            executeBatch(totalBatch, observer, atomicLong);
+
+                        }
+                    });
+            }
+        });
+
     }
 
     protected void executeBatch( final MutationBatch batch, final DataMigration.ProgressObserver po, final AtomicLong count ) {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/79ec9787/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/SerializationModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/SerializationModule.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/SerializationModule.java
index c119d84..b3388bc 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/SerializationModule.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/SerializationModule.java
@@ -23,6 +23,7 @@ import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerialization
 import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
 import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
 import org.apache.usergrid.persistence.core.guice.*;
+import org.apache.usergrid.persistence.core.migration.data.CollectionDataMigration;
 import org.apache.usergrid.persistence.core.migration.data.DataMigration;
 import org.apache.usergrid.persistence.core.migration.schema.Migration;
 
@@ -50,13 +51,13 @@ public class SerializationModule extends AbstractModule {
         bind( MvccEntitySerializationStrategy.class ).annotatedWith( V3Impl.class )
                                                      .to(MvccEntitySerializationStrategyV3Impl.class);
 
-        bind(MvccEntitySerializationStrategy.class).annotatedWith( V1ProxyImpl.class )
+        bind(MvccEntitySerializationStrategy.class).annotatedWith(V1ProxyImpl.class)
                                                      .to(MvccEntitySerializationStrategyProxyV1Impl.class);
-        bind(MvccEntitySerializationStrategy.class ).annotatedWith( ProxyImpl.class )
+        bind(MvccEntitySerializationStrategy.class ).annotatedWith(ProxyImpl.class)
                                                      .to(MvccEntitySerializationStrategyProxyV2Impl.class);
 
-        Multibinder<DataMigration> dataMigrationMultibinder =
-            Multibinder.newSetBinder( binder(), DataMigration.class );
+        Multibinder<CollectionDataMigration> dataMigrationMultibinder =
+            Multibinder.newSetBinder( binder(), CollectionDataMigration.class );
         dataMigrationMultibinder.addBinding().to( MvccEntityDataMigrationImpl.class );
 
         bind( MvccEntityMigrationStrategy.class ).to(MvccEntitySerializationStrategyProxyV2Impl.class);

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/79ec9787/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
index f2adee5..1e428e5 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
@@ -19,6 +19,7 @@
 package org.apache.usergrid.persistence.core.guice;
 
 
+import org.apache.usergrid.persistence.core.migration.data.*;
 import org.safehaus.guicyfig.GuicyFigModule;
 
 import org.apache.usergrid.persistence.core.astyanax.AstyanaxKeyspaceProvider;
@@ -27,11 +28,6 @@ import org.apache.usergrid.persistence.core.astyanax.CassandraConfigImpl;
 import org.apache.usergrid.persistence.core.astyanax.CassandraFig;
 import org.apache.usergrid.persistence.core.consistency.TimeService;
 import org.apache.usergrid.persistence.core.consistency.TimeServiceImpl;
-import org.apache.usergrid.persistence.core.migration.data.DataMigration;
-import org.apache.usergrid.persistence.core.migration.data.DataMigrationManager;
-import org.apache.usergrid.persistence.core.migration.data.DataMigrationManagerImpl;
-import org.apache.usergrid.persistence.core.migration.data.MigrationInfoSerialization;
-import org.apache.usergrid.persistence.core.migration.data.MigrationInfoSerializationImpl;
 import org.apache.usergrid.persistence.core.migration.schema.Migration;
 import org.apache.usergrid.persistence.core.migration.schema.MigrationManager;
 import org.apache.usergrid.persistence.core.migration.schema.MigrationManagerFig;
@@ -81,7 +77,8 @@ public class CommonModule extends AbstractModule {
 
 
         //do multibindings for migrations
-        Multibinder<DataMigration> dataMigrationMultibinder = Multibinder.newSetBinder( binder(), DataMigration.class );
+        Multibinder<ApplicationDataMigration> applicationDataMigrationMultibinder = Multibinder.newSetBinder( binder(), ApplicationDataMigration.class );
+        Multibinder<CollectionDataMigration> collectionDataMigrationMultibinder = Multibinder.newSetBinder( binder(), CollectionDataMigration.class );
 //        dataMigrationMultibinder.addBinding();
 //        dataMigrationManagerMultibinder.addBinding().to( DataMigrationManagerImpl.class );
 //        migrationBinding.addBinding().to( Key.get( MigrationInfoSerialization.class ) );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/79ec9787/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/ApplicationDataMigration.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/ApplicationDataMigration.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/ApplicationDataMigration.java
new file mode 100644
index 0000000..726ff3a
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/ApplicationDataMigration.java
@@ -0,0 +1,38 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  *  contributor license agreements.  The ASF licenses this file to You
+ *  * under the Apache License, Version 2.0 (the "License"); you may not
+ *  * use this file except in compliance with the License.
+ *  * You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.  For additional information regarding
+ *  * copyright in this work, please see the NOTICE file in the top level
+ *  * directory of this distribution.
+ *
+ */
+package org.apache.usergrid.persistence.core.migration.data;
+
+import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import rx.Observable;
+
+/**
+ * Migrate applications
+ */
+public interface ApplicationDataMigration extends DataMigration {
+
+    /**
+     * Migrate the data to the specified version
+     * @param observer
+     * @throws Throwable
+     */
+    public Observable migrate(final Observable<ApplicationScope> applicationEntityGroup, final ProgressObserver observer) throws Throwable;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/79ec9787/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/CollectionDataMigration.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/CollectionDataMigration.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/CollectionDataMigration.java
new file mode 100644
index 0000000..24dab0f
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/CollectionDataMigration.java
@@ -0,0 +1,36 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  *  contributor license agreements.  The ASF licenses this file to You
+ *  * under the Apache License, Version 2.0 (the "License"); you may not
+ *  * use this file except in compliance with the License.
+ *  * You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.  For additional information regarding
+ *  * copyright in this work, please see the NOTICE file in the top level
+ *  * directory of this distribution.
+ *
+ */
+package org.apache.usergrid.persistence.core.migration.data;
+
+import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
+import rx.Observable;
+
+/**
+ * Migrate Collections
+ */
+public interface CollectionDataMigration extends DataMigration {
+    /**
+     * Migrate the data to the specified version
+     * @param observer
+     * @throws Throwable
+     */
+    public Observable migrate(final Observable<ApplicationEntityGroup> applicationEntityGroup,final ProgressObserver observer) throws Throwable;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/79ec9787/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigration.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigration.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigration.java
index cae5623..495552e 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigration.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigration.java
@@ -43,12 +43,6 @@ import rx.Observable;
 public interface DataMigration {
 
 
-    /**
-     * Migrate the data to the specified version
-     * @param observer
-     * @throws Throwable
-     */
-    public Observable migrate(final ApplicationEntityGroup applicationEntityGroup,final ProgressObserver observer) throws Throwable;
 
     /**
      * Get the version of this migration.  It must be unique.
@@ -85,9 +79,9 @@ public interface DataMigration {
 
     public enum MigrationType{
         Entities,
-        Edges,
-        Index,
-        Other
+        Applications,
+        System,
+        Collections
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/79ec9787/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManagerImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManagerImpl.java
index b8e4af9..58ba6ff 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManagerImpl.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManagerImpl.java
@@ -21,29 +21,18 @@ package org.apache.usergrid.persistence.core.migration.data;
 
 import java.io.PrintWriter;
 import java.io.StringWriter;
-import java.util.ArrayList;
-import java.util.NavigableMap;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
+import java.util.*;
 
 import org.apache.usergrid.persistence.core.rx.AllEntitiesInSystemObservable;
 import org.apache.usergrid.persistence.core.rx.ApplicationObservable;
 import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
-import org.apache.usergrid.persistence.core.scope.EntityIdScope;
-import org.apache.usergrid.persistence.model.entity.Id;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.usergrid.persistence.core.migration.schema.MigrationException;
 
 import com.google.common.base.Preconditions;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import rx.Observable;
@@ -63,20 +52,25 @@ public class DataMigrationManagerImpl implements DataMigrationManager {
     private final AllEntitiesInSystemObservable allEntitiesInSystemObservable;
 
 
-    private final Set<DataMigration> migrations;
     private final ApplicationObservable applicationObservable;
+    private final Set<ApplicationDataMigration> appMigrations;
+    private final Set<CollectionDataMigration> collectionMigrations;
 
 
     @Inject
     public DataMigrationManagerImpl( final MigrationInfoSerialization migrationInfoSerialization,
-                                     final Set<DataMigration> migrations,
+                                     final Set<ApplicationDataMigration> appMigrations,
+                                     final Set<CollectionDataMigration> collectionMigrations,
                                      final AllEntitiesInSystemObservable allEntitiesInSystemObservable,
                                      final ApplicationObservable applicationObservable
     ) {
+        this.appMigrations = appMigrations;
+        this.collectionMigrations = collectionMigrations;
 
         Preconditions.checkNotNull( migrationInfoSerialization,
                 "migrationInfoSerialization must not be null" );
-        Preconditions.checkNotNull( migrations, "migrations must not be null" );
+        Preconditions.checkNotNull( collectionMigrations, "migrations must not be null" );
+        Preconditions.checkNotNull( appMigrations, "migrations must not be null" );
         Preconditions.checkNotNull( allEntitiesInSystemObservable, "allentitiesobservable must not be null" );
         Preconditions.checkNotNull( applicationObservable, "applicationObservable must not be null" );
 
@@ -84,7 +78,6 @@ public class DataMigrationManagerImpl implements DataMigrationManager {
         this.allEntitiesInSystemObservable = allEntitiesInSystemObservable;
         this.applicationObservable = applicationObservable;
 
-        this.migrations = migrations;
     }
 
 
@@ -106,100 +99,160 @@ public class DataMigrationManagerImpl implements DataMigrationManager {
                 migrationTreeMap.lastKey() );
 
         //we have our migrations to run, execute them
-        final NavigableMap<Integer, DataMigration> migrationsToRun =
-                migrationTreeMap.tailMap( currentVersion, false );
+        final Collection< DataMigration> migrationsToRun =  migrationTreeMap.tailMap( currentVersion, false ).values();
+
+
 
         final CassandraProgressObserver observer = new CassandraProgressObserver();
 
-        Observable<DataMigration> migrations = Observable.from(migrationsToRun.values()).subscribeOn(Schedulers.io());
-        final Observable appIdObservable = applicationObservable.getAllApplicationIds();
+        final Observable<ApplicationScope> appScopeObservable = applicationObservable.getAllApplicationScopes();
+
+        final Observable<ApplicationEntityGroup> entitiesObservable = appScopeObservable.flatMap(new Func1<ApplicationScope, Observable<ApplicationEntityGroup>>() {
+            @Override
+            public Observable<ApplicationEntityGroup> call(ApplicationScope applicationScope) {
+                return allEntitiesInSystemObservable.getAllEntitiesInSystem(appScopeObservable, 1000);
+            }
+        });
 
-        Observable entityMigrations = migrations
+        final Observable<ApplicationDataMigration> applicationMigrationsRx = Observable.from(migrationsToRun)
             .filter(new Func1<DataMigration, Boolean>() {
                 @Override
                 public Boolean call(DataMigration dataMigration) {
-                    return dataMigration.getType() == DataMigration.MigrationType.Entities;
+                    return dataMigration instanceof ApplicationDataMigration;
                 }
-            }).flatMap(new Func1<DataMigration, Observable<ApplicationEntityGroup>>() {
+            }).map(new Func1<DataMigration, ApplicationDataMigration>() {
                 @Override
-                public Observable<ApplicationEntityGroup> call(final DataMigration dataMigration) {
-                    return allEntitiesInSystemObservable
-                        .getAllEntitiesInSystem(appIdObservable, 1000)
-                        .doOnNext(new Action1<ApplicationEntityGroup>() {
-                            @Override
-                            public void call(ApplicationEntityGroup entityGroup) {
-                                runMigration(dataMigration,observer,entityGroup);
-                            }
-                        });
+                public ApplicationDataMigration call(DataMigration dataMigration) {
+                    return (ApplicationDataMigration)dataMigration;
                 }
             });
 
-        //migrations that aren't entities
-        Observable otherMigrations = migrations
+        final Observable<CollectionDataMigration> collectionMigrationsRx = Observable.from(migrationsToRun)
             .filter(new Func1<DataMigration, Boolean>() {
                 @Override
                 public Boolean call(DataMigration dataMigration) {
-                    return dataMigration.getType() != DataMigration.MigrationType.Entities;
+                    return dataMigration instanceof CollectionDataMigration;
                 }
-            }).flatMap(new Func1<DataMigration, Observable<?>>() {
+            }).map(new Func1<DataMigration, CollectionDataMigration>() {
                 @Override
-                public Observable call(final DataMigration dataMigration) {
-                    return appIdObservable.doOnNext(new Action1<Id>() {
+                public CollectionDataMigration call(DataMigration dataMigration) {
+                    return (CollectionDataMigration) dataMigration;
+                }
+            });
+
+        Observable applications = applicationMigrationsRx
+            .doOnNext(new Action1<ApplicationDataMigration>() {
                         @Override
-                        public void call(Id id) {
-                            ApplicationScope scope = new ApplicationScopeImpl(id);
-                            runMigration(dataMigration, observer, new ApplicationEntityGroup(scope, null));
+                        public void call(ApplicationDataMigration dataMigration) {
+
+                            migrationInfoSerialization.setStatusCode(StatusCode.RUNNING.status);
+
+                            final int migrationVersion = dataMigration.getVersion();
+
+                            LOG.info("Running migration version {}", migrationVersion);
+
+                            observer.update(migrationVersion, "Starting migration");
+
+                            //perform this migration, if it fails, short circuit
+                            try {
+                                dataMigration.migrate(appScopeObservable, observer).toBlocking().lastOrDefault(null);
+                            } catch (Throwable throwable) {
+                                observer.failed(migrationVersion, "Exception thrown during migration", throwable);
+                                LOG.error("Unable to migrate to version {}.", migrationVersion, throwable);
+                                throw new RuntimeException(throwable);
+                            }
+
+                            //we had an unhandled exception or the migration failed, short circuit
+                            if (observer.failed) {
+                                return;
+                            }
+
+                            //set the version
+                            migrationInfoSerialization.setVersion(migrationVersion);
+
+                            //update the observer for progress so other nodes can see it
+                            observer.update(migrationVersion, "Completed successfully");
+
+                        }
+                    });
+
+
+        Observable entities =collectionMigrationsRx
+                    .doOnNext(new Action1<CollectionDataMigration>() {
+                        @Override
+                        public void call(CollectionDataMigration dataMigration) {
+                            migrationInfoSerialization.setStatusCode(StatusCode.RUNNING.status);
+
+                            final int migrationVersion = dataMigration.getVersion();
+
+                            LOG.info("Running migration version {}", migrationVersion);
+
+                            observer.update(migrationVersion, "Starting migration");
+
+                            //perform this migration, if it fails, short circuit
+                            try {
+                                dataMigration.migrate(entitiesObservable, observer).toBlocking().lastOrDefault(null);
+                            } catch (Throwable throwable) {
+                                observer.failed(migrationVersion, "Exception thrown during migration", throwable);
+                                LOG.error("Unable to migrate to version {}.", migrationVersion, throwable);
+                                throw new RuntimeException(throwable);
+                            }
+
+                            //we had an unhandled exception or the migration failed, short circuit
+                            if (observer.failed) {
+                                return;
+                            }
+
+                            //set the version
+                            migrationInfoSerialization.setVersion(migrationVersion);
+
+                            //update the observer for progress so other nodes can see it
+                            observer.update(migrationVersion, "Completed successfully");
                         }
                     });
 
-                }
-            });
 
         try {
             Observable
-                .merge(entityMigrations, otherMigrations)
+                .merge(applications, entities)
                 .subscribeOn(Schedulers.io())
                 .toBlocking().lastOrDefault(null);
             migrationInfoSerialization.setStatusCode( StatusCode.COMPLETE.status );
         } catch (Exception e){
-            LOG.error("Migration Failed");
+            LOG.error("Migration Failed",e);
         }
 
     }
 
-    private void runMigration(DataMigration migration, CassandraProgressObserver observer, ApplicationEntityGroup applicationEntityGroup) {
-        migrationInfoSerialization.setStatusCode(StatusCode.RUNNING.status);
 
-        final int migrationVersion = migration.getVersion();
 
-        LOG.info("Running migration version {}", migrationVersion);
+    private boolean populateTreeMap() {
+        if ( migrationTreeMap.isEmpty() ) {
+            for ( DataMigration migration : appMigrations ) {
 
-        observer.update(migrationVersion, "Starting migration");
+                Preconditions.checkNotNull(migration,
+                    "A migration instance in the set of migrations was null.  This is not allowed");
 
-        //perform this migration, if it fails, short circuit
-        try {
-            migration.migrate(applicationEntityGroup, observer).toBlocking().lastOrDefault(null);
-        } catch (Throwable throwable) {
-            observer.failed(migrationVersion, "Exception thrown during migration", throwable);
-            LOG.error("Unable to migrate to version {}.", migrationVersion, throwable);
-            throw new RuntimeException(throwable) ;
-        }
+                final int version = migration.getVersion();
 
-        //we had an unhandled exception or the migration failed, short circuit
-        if (observer.failed) {
-            return ;
-        }
+                final DataMigration existing = migrationTreeMap.get( version );
+
+                if ( existing != null ) {
 
-        //set the version
-        migrationInfoSerialization.setVersion(migrationVersion);
+                    final Class<? extends DataMigration> existingClass = existing.getClass();
 
-        //update the observer for progress so other nodes can see it
-        observer.update(migrationVersion, "Completed successfully");
-    }
+                    final Class<? extends DataMigration> currentClass = migration.getClass();
 
-    private boolean populateTreeMap() {
-        if ( migrationTreeMap.isEmpty() ) {
-            for ( DataMigration migration : migrations ) {
+
+                    throw new DataMigrationException( String.format(
+                        "Data migrations must be unique.  Both classes %s and %s have version %d",
+                        existingClass, currentClass, version ) );
+                }
+
+                migrationTreeMap.put( version, migration );
+            }
+
+            for ( DataMigration migration : collectionMigrations ) {
 
                 Preconditions.checkNotNull(migration,
                     "A migration instance in the set of migrations was null.  This is not allowed");

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/79ec9787/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/AllEntitiesInSystemObservable.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/AllEntitiesInSystemObservable.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/AllEntitiesInSystemObservable.java
index 141b6c1..e029f92 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/AllEntitiesInSystemObservable.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/AllEntitiesInSystemObservable.java
@@ -43,10 +43,10 @@ public interface AllEntitiesInSystemObservable<T extends ApplicationScope> {
      * Return an observable that emits all entities in the system.
      *
      * @param appIdObservable list of app ids
-     * @param bufferSize The amount of entityIds to buffer into each ApplicationEntityGroup.  Note that if we exceed the buffer size
-     *                   you may be more than 1 ApplicationEntityGroup with the same application and different ids
+     * @param bufferSize      The amount of entityIds to buffer into each ApplicationEntityGroup.  Note that if we exceed the buffer size
+     *                        you may be more than 1 ApplicationEntityGroup with the same application and different ids
      */
-    public Observable<ApplicationEntityGroup<T>> getAllEntitiesInSystem(Observable<Id> appIdObservable, final int bufferSize);
+    public Observable<ApplicationEntityGroup<T>> getAllEntitiesInSystem(Observable<ApplicationScope> appIdObservable, final int bufferSize);
 
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/79ec9787/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ApplicationObservable.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ApplicationObservable.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ApplicationObservable.java
index 09e888d..a229eb6 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ApplicationObservable.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ApplicationObservable.java
@@ -19,6 +19,7 @@
  */
 package org.apache.usergrid.persistence.core.rx;
 
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.model.entity.Id;
 import rx.Observable;
 
@@ -30,4 +31,9 @@ public interface ApplicationObservable {
      * Get all applicationIds as an observable
      */
     Observable<Id> getAllApplicationIds();
+
+    /**
+    * get all application scopes
+     */
+    Observable<ApplicationScope> getAllApplicationScopes();
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/79ec9787/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/guice/MaxMigrationModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/guice/MaxMigrationModule.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/guice/MaxMigrationModule.java
index 2d91492..4e9ef90 100644
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/guice/MaxMigrationModule.java
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/guice/MaxMigrationModule.java
@@ -20,6 +20,7 @@
 package org.apache.usergrid.persistence.core.guice;
 
 
+import org.apache.usergrid.persistence.core.migration.data.ApplicationDataMigration;
 import org.apache.usergrid.persistence.core.migration.data.DataMigration;
 
 import com.google.inject.AbstractModule;
@@ -33,7 +34,7 @@ import com.google.inject.multibindings.Multibinder;
 public class MaxMigrationModule extends AbstractModule {
     @Override
     protected void configure() {
-        Multibinder<DataMigration> dataMigrationMultibinder = Multibinder.newSetBinder( binder(), DataMigration.class );
+        Multibinder<ApplicationDataMigration> dataMigrationMultibinder = Multibinder.newSetBinder( binder(), ApplicationDataMigration.class );
         dataMigrationMultibinder.addBinding().to( MaxMigrationVersion.class );
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/79ec9787/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/guice/MaxMigrationVersion.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/guice/MaxMigrationVersion.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/guice/MaxMigrationVersion.java
index d903a7d..d3b9b49 100644
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/guice/MaxMigrationVersion.java
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/guice/MaxMigrationVersion.java
@@ -20,23 +20,24 @@
 package org.apache.usergrid.persistence.core.guice;
 
 
+import org.apache.usergrid.persistence.core.migration.data.ApplicationDataMigration;
 import org.apache.usergrid.persistence.core.migration.data.DataMigration;
 import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import rx.Observable;
 
 
 /**
  * A simple migration that sets the version to max. This way our integration tests always test the latest code
  */
-public class MaxMigrationVersion implements DataMigration {
+public class MaxMigrationVersion implements ApplicationDataMigration {
 
     @Override
-    public Observable migrate(final ApplicationEntityGroup applicationEntityGroup, final ProgressObserver observer ) throws Throwable {
+    public Observable migrate(final Observable<ApplicationScope> applicationEntityGroup, final ProgressObserver observer) {
          //no op, just needs to run to be set
         return Observable.empty();
     }
 
-
     @Override
     public int getVersion() {
         return Integer.MAX_VALUE;
@@ -44,6 +45,6 @@ public class MaxMigrationVersion implements DataMigration {
 
     @Override
     public MigrationType getType() {
-        return MigrationType.Other;
+        return MigrationType.Applications;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/79ec9787/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManagerImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManagerImplTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManagerImplTest.java
index 0c7e76b..16092b5 100644
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManagerImplTest.java
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManagerImplTest.java
@@ -28,6 +28,7 @@ import org.apache.usergrid.persistence.core.rx.AllEntitiesInSystemObservable;
 import org.apache.usergrid.persistence.core.rx.ApplicationObservable;
 import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
 import org.apache.usergrid.persistence.core.scope.EntityIdScope;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.entity.SimpleId;
@@ -75,6 +76,11 @@ public class DataMigrationManagerImplTest {
         public Observable<Id> getAllApplicationIds() {
             return Observable.just( (Id)new SimpleId("application"));
         }
+
+        @Override
+        public Observable<ApplicationScope> getAllApplicationScopes() {
+            return Observable.just( (ApplicationScope)new ApplicationScopeImpl((Id)new SimpleId("application")));
+        }
     };
 
     @Test
@@ -82,9 +88,10 @@ public class DataMigrationManagerImplTest {
         final MigrationInfoSerialization serialization = mock( MigrationInfoSerialization.class );
         when(serialization.getCurrentVersion()).thenReturn(1);
 
-        Set<DataMigration> emptyMigration = new HashSet<>();
+        Set<CollectionDataMigration> collectionDataMigrations = new HashSet<>();
+        Set<ApplicationDataMigration> emptyMigration = new HashSet<>();
 
-        DataMigrationManagerImpl migrationManager = new DataMigrationManagerImpl( serialization, emptyMigration, allEntitiesInSystemObservable,allApplicationsObservable );
+        DataMigrationManagerImpl migrationManager = new DataMigrationManagerImpl( serialization, emptyMigration,collectionDataMigrations, allEntitiesInSystemObservable,allApplicationsObservable );
 
         migrationManager.migrate();
 
@@ -100,27 +107,29 @@ public class DataMigrationManagerImplTest {
         when(serialization.getCurrentVersion()).thenReturn(1);
 
 
-        final DataMigration v1 = mock( DataMigration.class );
+        final ApplicationDataMigration v1 = mock( ApplicationDataMigration.class );
         when( v1.getVersion() ).thenReturn( 2 );
-        when( v1.migrate(any(ApplicationEntityGroup.class), any(DataMigration.ProgressObserver.class))).thenReturn(Observable.empty());
+        when( v1.migrate(any(Observable.class), any(DataMigration.ProgressObserver.class))).thenReturn(Observable.empty());
 
-        final DataMigration v2 = mock( DataMigration.class );
+        final ApplicationDataMigration v2 = mock( ApplicationDataMigration.class );
         when( v2.getVersion() ).thenReturn( 3 );
-        when(v2.migrate(any(ApplicationEntityGroup.class), any(DataMigration.ProgressObserver.class))).thenReturn(Observable.empty());
+        when(v2.migrate(any(Observable.class), any(DataMigration.ProgressObserver.class))).thenReturn(Observable.empty());
 
 
-        Set<DataMigration> migrations = new HashSet<>();
+        Set<ApplicationDataMigration> migrations = new HashSet<>();
         migrations.add( v1 );
         migrations.add( v2 );
 
+        Set<CollectionDataMigration> collectionDataMigrations = new HashSet<>();
 
-        DataMigrationManagerImpl migrationManager = new DataMigrationManagerImpl( serialization, migrations,allEntitiesInSystemObservable,allApplicationsObservable );
+
+        DataMigrationManagerImpl migrationManager = new DataMigrationManagerImpl( serialization, migrations,collectionDataMigrations,allEntitiesInSystemObservable,allApplicationsObservable );
 
         migrationManager.migrate();
 
 
-        verify( v1 ).migrate(any(ApplicationEntityGroup.class), any( DataMigration.ProgressObserver.class ) );
-        verify( v2 ).migrate(any(ApplicationEntityGroup.class), any( DataMigration.ProgressObserver.class ) );
+        verify( v1 ).migrate(any(Observable.class), any( DataMigration.ProgressObserver.class ) );
+        verify( v2 ).migrate(any(Observable.class), any( DataMigration.ProgressObserver.class ) );
 
         //verify we set the running status
         verify( serialization, times( 2 ) ).setStatusCode( DataMigrationManagerImpl.StatusCode.RUNNING.status );
@@ -144,33 +153,35 @@ public class DataMigrationManagerImplTest {
         when(serialization.getCurrentVersion()).thenReturn(1);
 
 
-        final DataMigration v1 = mock( DataMigration.class,"mock1" );
+        final ApplicationDataMigration v1 = mock( ApplicationDataMigration.class,"mock1" );
         when( v1.getVersion() ).thenReturn( 2 );
         when( v1.getType() ).thenReturn(DataMigration.MigrationType.Entities);
-        when( v1.migrate(any(ApplicationEntityGroup.class), any(DataMigration.ProgressObserver.class))).thenReturn(Observable.empty());
+        when( v1.migrate(any(Observable.class), any(DataMigration.ProgressObserver.class))).thenReturn(Observable.empty());
 
         //throw an exception
-        when( v1.migrate(any(ApplicationEntityGroup.class),
+        when( v1.migrate(any(Observable.class),
                 any(DataMigration.ProgressObserver.class) )).thenThrow(new RuntimeException( "Something bad happened" ));
 
-        final DataMigration v2 = mock( DataMigration.class,"mock2" );
+        final ApplicationDataMigration v2 = mock( ApplicationDataMigration.class,"mock2" );
         when( v2.getType() ).thenReturn(DataMigration.MigrationType.Entities);
         when( v2.getVersion() ).thenReturn( 3 );
 
-        Set<DataMigration> migrations = new HashSet<>();
+        Set<ApplicationDataMigration> migrations = new HashSet<>();
         migrations.add( v1 );
         migrations.add( v2 );
+        Set<CollectionDataMigration> collectionDataMigrations = new HashSet<>();
 
 
-        DataMigrationManagerImpl migrationManager = new DataMigrationManagerImpl( serialization, migrations,allEntitiesInSystemObservable,allApplicationsObservable );
+        DataMigrationManagerImpl migrationManager
+            = new DataMigrationManagerImpl( serialization, migrations,collectionDataMigrations,allEntitiesInSystemObservable,allApplicationsObservable );
 
         migrationManager.migrate();
 
 
-        verify( v1 ).migrate( any(ApplicationEntityGroup.class),any( DataMigration.ProgressObserver.class ) );
+        verify( v1 ).migrate( any(Observable.class),any( DataMigration.ProgressObserver.class ) );
 
         //verify we don't run migration
-        verify( v2, never() ).migrate( any(ApplicationEntityGroup.class),any( DataMigration.ProgressObserver.class ) );
+        verify( v2, never() ).migrate( any(Observable.class),any( DataMigration.ProgressObserver.class ) );
 
         //verify we set the running status
         verify( serialization, times( 1 ) ).setStatusCode( DataMigrationManagerImpl.StatusCode.RUNNING.status );
@@ -194,17 +205,17 @@ public class DataMigrationManagerImplTest {
         final MigrationInfoSerialization serialization = mock(MigrationInfoSerialization.class);
         when(serialization.getCurrentVersion()).thenReturn(1);
 
-        final DataMigration v1 = mock( DataMigration.class );
+        final CollectionDataMigration v1 = mock( CollectionDataMigration.class );
         when( v1.getVersion() ).thenReturn( 2 );
         when( v1.getType() ).thenReturn(DataMigration.MigrationType.Entities);
-        when( v1.migrate(any(ApplicationEntityGroup.class), any(DataMigration.ProgressObserver.class))).thenReturn(Observable.empty());
+        when( v1.migrate(any(Observable.class), any(DataMigration.ProgressObserver.class))).thenReturn(Observable.empty());
 
         final int returnedCode = 100;
 
         final String reason = "test reason";
 
         //mark as fail but don't
-        when(v1.migrate(any(ApplicationEntityGroup.class), any(DataMigration.ProgressObserver.class))).thenAnswer(
+        when(v1.migrate(any(Observable.class), any(DataMigration.ProgressObserver.class))).thenAnswer(
             new Answer<Object>() {
                 @Override
                 public Object answer(final InvocationOnMock invocation) throws Throwable {
@@ -218,25 +229,26 @@ public class DataMigrationManagerImplTest {
 
         );
 
-        final DataMigration v2 = mock( DataMigration.class );
+        final CollectionDataMigration v2 = mock( CollectionDataMigration.class );
         when( v2.getVersion() ).thenReturn( 3 );
         when( v2.getType() ).thenReturn(DataMigration.MigrationType.Entities);
-        when(v2.migrate(any(ApplicationEntityGroup.class), any(DataMigration.ProgressObserver.class))).thenReturn(Observable.empty());
+        when(v2.migrate(any(Observable.class), any(DataMigration.ProgressObserver.class))).thenReturn(Observable.empty());
 
-        Set<DataMigration> migrations = new HashSet<>();
-        migrations.add( v1 );
-        migrations.add( v2 );
+        Set<CollectionDataMigration> collectionMigrations = new HashSet<>();
+        collectionMigrations.add( v1 );
+        collectionMigrations.add(v2);
+        Set<ApplicationDataMigration> applicationDataMigrations = new HashSet<>();
 
 
-        DataMigrationManagerImpl migrationManager = new DataMigrationManagerImpl( serialization, migrations,allEntitiesInSystemObservable, allApplicationsObservable );
+        DataMigrationManagerImpl migrationManager = new DataMigrationManagerImpl( serialization, applicationDataMigrations,collectionMigrations,allEntitiesInSystemObservable, allApplicationsObservable );
 
         migrationManager.migrate();
 
 
-        verify( v1 ).migrate(any(ApplicationEntityGroup.class), any( DataMigration.ProgressObserver.class ) );
+        verify( v1 ).migrate(any(Observable.class), any( DataMigration.ProgressObserver.class ) );
 
         //verify we don't run migration
-        verify( v2, never() ).migrate( any(ApplicationEntityGroup.class),any( DataMigration.ProgressObserver.class ) );
+        verify( v2, never() ).migrate( any(Observable.class),any( DataMigration.ProgressObserver.class ) );
 
         //verify we set the running status
         verify( serialization, times( 1 ) ).setStatusCode( DataMigrationManagerImpl.StatusCode.RUNNING.status );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/79ec9787/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/ApplicationsTestObservable.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/ApplicationsTestObservable.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/ApplicationsTestObservable.java
index c0d52dd..b798c41 100644
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/ApplicationsTestObservable.java
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/ApplicationsTestObservable.java
@@ -19,6 +19,7 @@
  */
 package org.apache.usergrid.persistence.core.rx;
 
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.model.entity.Id;
 import rx.Observable;
 
@@ -30,4 +31,9 @@ public class ApplicationsTestObservable implements ApplicationObservable {
     public Observable<Id> getAllApplicationIds() {
         return Observable.empty();
     }
+
+    @Override
+    public Observable<ApplicationScope> getAllApplicationScopes() {
+        return Observable.empty();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/79ec9787/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
index 5e4a300..b695512 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
@@ -21,6 +21,7 @@ package org.apache.usergrid.persistence.graph.guice;
 
 import org.apache.usergrid.persistence.core.guice.V1Impl;
 import org.apache.usergrid.persistence.core.guice.V2Impl;
+import org.apache.usergrid.persistence.core.migration.data.ApplicationDataMigration;
 import org.apache.usergrid.persistence.core.migration.data.DataMigration;
 import org.apache.usergrid.persistence.graph.serialization.*;
 import org.apache.usergrid.persistence.graph.serialization.impl.*;
@@ -116,8 +117,8 @@ public class GraphModule extends AbstractModule {
         bind( EdgeMetaRepair.class ).to( EdgeMetaRepairImpl.class );
         bind( EdgeDeleteRepair.class ).to( EdgeDeleteRepairImpl.class );
 
-        Multibinder<DataMigration> dataMigrationMultibinder =
-            Multibinder.newSetBinder( binder(), DataMigration.class );
+        Multibinder<ApplicationDataMigration> dataMigrationMultibinder =
+            Multibinder.newSetBinder( binder(), ApplicationDataMigration.class );
         dataMigrationMultibinder.addBinding().to( EdgeDataMigrationImpl.class );
 
         /**
@@ -130,11 +131,11 @@ public class GraphModule extends AbstractModule {
 
         bind( EdgeShardStrategy.class ).to( SizebasedEdgeShardStrategy.class );
 
-        bind( ShardedEdgeSerialization.class ).to( ShardedEdgeSerializationImpl.class );
+        bind(ShardedEdgeSerialization.class ).to( ShardedEdgeSerializationImpl.class );
 
         bind( EdgeColumnFamilies.class ).to( SizebasedEdgeColumnFamilies.class );
 
-        bind( ShardGroupCompaction.class ).to( ShardGroupCompactionImpl.class );
+        bind(ShardGroupCompaction.class ).to( ShardGroupCompactionImpl.class );
 
 
         /**

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/79ec9787/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeDataMigrationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeDataMigrationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeDataMigrationImpl.java
index 975e1a2..7d4fb2e 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeDataMigrationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeDataMigrationImpl.java
@@ -23,9 +23,10 @@ import com.google.inject.Inject;
 import com.netflix.astyanax.Keyspace;
 import com.netflix.astyanax.MutationBatch;
 import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+import org.apache.usergrid.persistence.core.migration.data.ApplicationDataMigration;
 import org.apache.usergrid.persistence.core.migration.data.DataMigration;
 import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
-import org.apache.usergrid.persistence.core.scope.EntityIdScope;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.graph.GraphManager;
 import org.apache.usergrid.persistence.graph.GraphManagerFactory;
@@ -44,7 +45,7 @@ import java.util.concurrent.atomic.AtomicLong;
  * Encapsulates data mi
  */
 
-public class EdgeDataMigrationImpl implements DataMigration {
+public class EdgeDataMigrationImpl implements ApplicationDataMigration {
 
     private static final Logger logger = LoggerFactory.getLogger(EdgeDataMigrationImpl.class);
 
@@ -65,73 +66,62 @@ public class EdgeDataMigrationImpl implements DataMigration {
         this.edgesFromSourceObservable = edgesFromSourceObservable;
         this.edgeMigrationStrategy = edgeMigrationStrategy;
     }
-    @Override
-    public Observable<Long> migrate(final ApplicationEntityGroup applicationEntityGroup,
-                                    final DataMigration.ProgressObserver observer) {
-        final GraphManager gm = graphManagerFactory.createEdgeManager(applicationEntityGroup.applicationScope);
-        final Observable<Edge> edgesFromSource = edgesFromSourceObservable.edgesFromSource(gm, applicationEntityGroup.applicationScope.getApplication());
 
+
+    @Override
+    public Observable migrate(final Observable<ApplicationScope> scopes,
+                              final DataMigration.ProgressObserver observer) {
         final AtomicLong counter = new AtomicLong();
-        rx.Observable o =
-            Observable
-                .from(applicationEntityGroup.entityIds)
-                .flatMap(new Func1<EntityIdScope, Observable<List<Edge>>>() {
-                    //for each id in the group, get it's edges
-                    @Override
-                    public Observable<List<Edge>> call(final EntityIdScope idScope) {
-                        logger.info("Migrating edges from node {} in scope {}", idScope.getId(),
-                            applicationEntityGroup.applicationScope);
-
-                        //get each edge from this node as a source
-                        return edgesFromSource
-
-                            //for each edge, re-index it in v2  every 1000 edges or less
-                            .buffer(1000)
-                            .doOnNext(new Action1<List<Edge>>() {
-                                @Override
-                                public void call(List<Edge> edges) {
-                                    final MutationBatch batch =
-                                        keyspace.prepareMutationBatch();
-
-                                    for (Edge edge : edges) {
-                                        logger.info("Migrating meta for edge {}", edge);
-                                        final MutationBatch edgeBatch = edgeMigrationStrategy.getMigration().to()
-                                            .writeEdge(
-                                                applicationEntityGroup
-                                                    .applicationScope,
-                                                edge);
-                                        batch.mergeShallow(edgeBatch);
-                                    }
-
-                                    try {
-                                        batch.execute();
-                                    } catch (ConnectionException e) {
-                                        throw new RuntimeException(
-                                            "Unable to perform migration", e);
-                                    }
-
-                                    //update the observer so the admin can see it
-                                    final long newCount =
-                                        counter.addAndGet(edges.size());
-
-                                    observer.update(getVersion(), String.format(
-                                        "Currently running.  Rewritten %d edge types",
-                                        newCount));
-                                }
-                            });
-                    }
-
-
-                })
-                .map(new Func1<List<Edge>, Long>() {
-                    @Override
-                    public Long call(List<Edge> edges) {
-                        return counter.get();
-                    }
-                });
-        return o;
+
+        return scopes.flatMap(new Func1<ApplicationScope, Observable<?>>() {
+            @Override
+            public Observable call(final ApplicationScope applicationScope) {
+                final GraphManager gm = graphManagerFactory.createEdgeManager(applicationScope);
+                final Observable<Edge> edgesFromSource = edgesFromSourceObservable.edgesFromSource(gm, applicationScope.getApplication());
+                logger.info("Migrating edges scope {}", applicationScope);
+
+                //get each edge from this node as a source
+                return edgesFromSource
+
+                    //for each edge, re-index it in v2  every 1000 edges or less
+                    .buffer(1000)
+                    .doOnNext(new Action1<List<Edge>>() {
+                        @Override
+                        public void call(List<Edge> edges) {
+                            final MutationBatch batch =
+                                keyspace.prepareMutationBatch();
+
+                            for (Edge edge : edges) {
+                                logger.info("Migrating meta for edge {}", edge);
+                                final MutationBatch edgeBatch = edgeMigrationStrategy.getMigration().to()
+                                    .writeEdge(applicationScope,
+                                        edge);
+                                batch.mergeShallow(edgeBatch);
+                            }
+
+                            try {
+                                batch.execute();
+                            } catch (ConnectionException e) {
+                                throw new RuntimeException(
+                                    "Unable to perform migration", e);
+                            }
+
+                            //update the observer so the admin can see it
+                            final long newCount =
+                                counter.addAndGet(edges.size());
+
+                            observer.update(getVersion(), String.format(
+                                "Currently running.  Rewritten %d edge types",
+                                newCount));
+                        }
+                    });
+            }
+        });
+
     }
 
+
+
     @Override
     public int getVersion() {
         return edgeMigrationStrategy.getVersion();
@@ -139,6 +129,6 @@ public class EdgeDataMigrationImpl implements DataMigration {
 
     @Override
     public MigrationType getType() {
-        return MigrationType.Edges;
+        return MigrationType.Applications;
     }
 }